在.net中使用Interlocked类进行多线程的正确方法

本文关键字:多线程 方法 net Interlocked | 更新日期: 2023-09-27 18:13:36

我有一个计数器,用于计算当前处理的大型报表

private int processedLargeReports;

,我正在生成并启动五个线程,其中每个线程访问这个方法:

public bool GenerateReport(EstimatedReportSize reportSize)
{
    var currentDateTime = DateTimeFactory.Instance.DateTimeNow;
    bool allowLargeReports = (this.processedLargeReports < Settings.Default.LargeReportLimit);
    var reportOrderNextInQueue = this.ReportOrderLogic.GetNextReportOrderAndLock(
        currentDateTime.AddHours(
        this.timeoutValueInHoursBeforeReleaseLock), 
        reportSize, 
        CorrelationIdForPickingReport, 
        allowLargeReports);
    if (reportOrderNextInQueue.IsProcessing)
    {
        Interlocked.Increment(ref this.processedLargeReports);                
    }
    var currentReport = this.GetReportToBeWorked(reportOrderNextInQueue);
    var works = this.WorkTheReport(reportOrderNextInQueue, currentReport, currentDateTime);
    if (reportOrderNextInQueue.IsProcessing)
    {
        Interlocked.Decrement(ref this.processedLargeReports);                
    }
    return works;           
}

"reportOrderNextInQueue"变量从数据库获取报告订单,并检查报告订单是"Normal"还是"Large"(这是通过定义reportOrderNextInQueue变量的bool IsProcessing属性来实现的)。如果是大报表,系统然后Interlock增加processedLargeReport int并处理大报表。一旦处理大报表,系统将互锁减少该值。

整个想法是我一次只允许处理一个报告,所以一旦一个线程处理一个大报告,其他线程就不应该能够访问数据库中的大报告。bool allowLargeReport变量检查processsedlargereports int和是否超过限制。

我很好奇这是否是正确的实现,因为我不能在星期一之前测试它。我不确定是否必须使用InterLocked类或只是将processedLargeReports变量定义为volatile成员。

在.net中使用Interlocked类进行多线程的正确方法

假设您有5个线程开始运行上面的代码,并且LargeReportLimit为1。它们都将读取processedLargeReports为0,allowLargeReports为true,并且它们将同时开始处理5个项目,尽管您的限制是1。如果我理解正确的话,我真的不明白这段代码是如何达到你的目标的。

稍微扩展一下:你读取 processedLargeReports,然后对其执行操作(使用它来检查是否应该允许处理报告)。您表现得好像这个变量不能在read和act之间更改,但事实并非如此。在读取和操作变量之间,任何数量的线程都可以使用processedLargeReports做任何事情,因为没有锁定。在这种情况下,互锁只会确保processedLargeReports在所有线程完成处理所有任务后始终为0,但仅此而已。

如果您需要限制对某些资源的并发访问-只需为此使用适当的工具:Semaphore或SemaphoreSlim类。创建允许使用LargeReportLimit线程的信号量。在处理报告之前,等待你的信号量。如果达到处理报告的并发线程数,则会阻塞。处理完成后,释放信号量以允许等待的线程进入。此处不需要使用Interlocked类

volatile不提供线程安全。与通常的多线程一样,您需要一些同步-它可以基于Interlocked, lock或任何其他同步原语,并取决于您的需要。你选择了Interlocked——很好,但是你有一个竞争条件。在任何同步块之外读取processedLargeReports字段,并根据该值做出决策。但在你读完之后,它可能会立即改变,所以整个逻辑不起作用。正确的方法是始终执行Interlocked.Increment并将逻辑基于返回值。像这样:

首先,让我们为字段

使用更好的名称
private int processingLargeReports;

public bool GenerateReport(EstimatedReportSize reportSize)
{
    var currentDateTime = DateTimeFactory.Instance.DateTimeNow;
    bool allowLargeReports = 
       (Interlocked.Increment(ref this.processingLargeReports) <= Settings.Default.LargeReportLimit);
    if (!allowLargeReports)
        Interlocked.Decrement(ref this.processingLargeReports);
    var reportOrderNextInQueue = this.ReportOrderLogic.GetNextReportOrderAndLock(
        currentDateTime.AddHours(
        this.timeoutValueInHoursBeforeReleaseLock), 
        reportSize, 
        CorrelationIdForPickingReport, 
        allowLargeReports);
    if (allowLargeReports && !reportOrderNextInQueue.IsProcessing)
        Interlocked.Decrement(ref this.processingLargeReports);
    var currentReport = this.GetReportToBeWorked(reportOrderNextInQueue);
    var works = this.WorkTheReport(reportOrderNextInQueue, currentReport, currentDateTime);
    if (allowLargeReports && reportOrderNextInQueue.IsProcessing)
        Interlocked.Decrement(ref this.processingLargeReports);
    return works;           
}

注意,这也包含竞争条件,但是保留了您的LargeReportLimit约束。

编辑:现在当我在想,因为你的处理是基于允许大报告,Interlocked不是一个好的选择,更好地使用基于Monitor的方法,如:

private int processingLargeReports;
private object processingLargeReportsLock = new object();
private void AcquireProcessingLargeReportsLock(ref bool lockTaken)
{
    Monitor.Enter(this.processingLargeReportsLock, ref lockTaken); 
}
private void ReleaseProcessingLargeReportsLock(ref bool lockTaken)
{
    if (!lockTaken) return;
    Monitor.Exit(this.processingLargeReportsLock);
    lockTaken = false;
}
public bool GenerateReport(EstimatedReportSize reportSize)
{
    bool lockTaken = false;
    try
    {
        this.AcquireProcessingLargeReportsLock(ref lockTaken); 
        bool allowLargeReports = (this.processingLargeReports < Settings.Default.LargeReportLimit);
        if (!allowLargeReports)
        {
            this.ReleaseProcessingLargeReportsLock(ref lockTaken);
        }
        var currentDateTime = DateTimeFactory.Instance.DateTimeNow;
        var reportOrderNextInQueue = this.ReportOrderLogic.GetNextReportOrderAndLock(
            currentDateTime.AddHours(
            this.timeoutValueInHoursBeforeReleaseLock), 
            reportSize, 
            CorrelationIdForPickingReport, 
            allowLargeReports);
        if (reportOrderNextInQueue.IsProcessing)
        {
            this.processingLargeReports++;
            this.ReleaseProcessingLargeReportsLock(ref lockTaken);
        }            
        var currentReport = this.GetReportToBeWorked(reportOrderNextInQueue);
        var works = this.WorkTheReport(reportOrderNextInQueue, currentReport, currentDateTime);
        if (reportOrderNextInQueue.IsProcessing)
        {
            this.AcquireProcessingLargeReportsLock(ref lockTaken); 
            this.processingLargeReports--;
        }            
        return works;
    }
    finally
    {
        this.ReleaseProcessingLargeReportsLock(ref lockTaken);
    }           
}