在.net中使用Interlocked类进行多线程的正确方法
本文关键字:多线程 方法 net Interlocked | 更新日期: 2023-09-27 18:13:36
我有一个计数器,用于计算当前处理的大型报表
private int processedLargeReports;
,我正在生成并启动五个线程,其中每个线程访问这个方法:
public bool GenerateReport(EstimatedReportSize reportSize)
{
var currentDateTime = DateTimeFactory.Instance.DateTimeNow;
bool allowLargeReports = (this.processedLargeReports < Settings.Default.LargeReportLimit);
var reportOrderNextInQueue = this.ReportOrderLogic.GetNextReportOrderAndLock(
currentDateTime.AddHours(
this.timeoutValueInHoursBeforeReleaseLock),
reportSize,
CorrelationIdForPickingReport,
allowLargeReports);
if (reportOrderNextInQueue.IsProcessing)
{
Interlocked.Increment(ref this.processedLargeReports);
}
var currentReport = this.GetReportToBeWorked(reportOrderNextInQueue);
var works = this.WorkTheReport(reportOrderNextInQueue, currentReport, currentDateTime);
if (reportOrderNextInQueue.IsProcessing)
{
Interlocked.Decrement(ref this.processedLargeReports);
}
return works;
}
"reportOrderNextInQueue"变量从数据库获取报告订单,并检查报告订单是"Normal"还是"Large"(这是通过定义reportOrderNextInQueue变量的bool IsProcessing属性来实现的)。如果是大报表,系统然后Interlock增加processedLargeReport int并处理大报表。一旦处理大报表,系统将互锁减少该值。
整个想法是我一次只允许处理一个报告,所以一旦一个线程处理一个大报告,其他线程就不应该能够访问数据库中的大报告。bool allowLargeReport变量检查processsedlargereports int和是否超过限制。
我很好奇这是否是正确的实现,因为我不能在星期一之前测试它。我不确定是否必须使用InterLocked类或只是将processedLargeReports变量定义为volatile成员。
假设您有5个线程开始运行上面的代码,并且LargeReportLimit为1。它们都将读取processedLargeReports为0,allowLargeReports为true,并且它们将同时开始处理5个项目,尽管您的限制是1。如果我理解正确的话,我真的不明白这段代码是如何达到你的目标的。
稍微扩展一下:你读取 processedLargeReports,然后对其执行操作(使用它来检查是否应该允许处理报告)。您表现得好像这个变量不能在read和act之间更改,但事实并非如此。在读取和操作变量之间,任何数量的线程都可以使用processedLargeReports做任何事情,因为没有锁定。在这种情况下,互锁只会确保processedLargeReports在所有线程完成处理所有任务后始终为0,但仅此而已。
如果您需要限制对某些资源的并发访问-只需为此使用适当的工具:Semaphore或SemaphoreSlim类。创建允许使用LargeReportLimit线程的信号量。在处理报告之前,等待你的信号量。如果达到处理报告的并发线程数,则会阻塞。处理完成后,释放信号量以允许等待的线程进入。此处不需要使用Interlocked类
volatile
不提供线程安全。与通常的多线程一样,您需要一些同步-它可以基于Interlocked
, lock
或任何其他同步原语,并取决于您的需要。你选择了Interlocked
——很好,但是你有一个竞争条件。在任何同步块之外读取processedLargeReports
字段,并根据该值做出决策。但在你读完之后,它可能会立即改变,所以整个逻辑不起作用。正确的方法是始终执行Interlocked.Increment
并将逻辑基于返回值。像这样:
首先,让我们为字段
使用更好的名称private int processingLargeReports;
和
public bool GenerateReport(EstimatedReportSize reportSize)
{
var currentDateTime = DateTimeFactory.Instance.DateTimeNow;
bool allowLargeReports =
(Interlocked.Increment(ref this.processingLargeReports) <= Settings.Default.LargeReportLimit);
if (!allowLargeReports)
Interlocked.Decrement(ref this.processingLargeReports);
var reportOrderNextInQueue = this.ReportOrderLogic.GetNextReportOrderAndLock(
currentDateTime.AddHours(
this.timeoutValueInHoursBeforeReleaseLock),
reportSize,
CorrelationIdForPickingReport,
allowLargeReports);
if (allowLargeReports && !reportOrderNextInQueue.IsProcessing)
Interlocked.Decrement(ref this.processingLargeReports);
var currentReport = this.GetReportToBeWorked(reportOrderNextInQueue);
var works = this.WorkTheReport(reportOrderNextInQueue, currentReport, currentDateTime);
if (allowLargeReports && reportOrderNextInQueue.IsProcessing)
Interlocked.Decrement(ref this.processingLargeReports);
return works;
}
注意,这也包含竞争条件,但是保留了您的LargeReportLimit约束。
编辑:现在当我在想,因为你的处理是基于允许和是大报告,Interlocked
不是一个好的选择,更好地使用基于Monitor
的方法,如:
private int processingLargeReports;
private object processingLargeReportsLock = new object();
private void AcquireProcessingLargeReportsLock(ref bool lockTaken)
{
Monitor.Enter(this.processingLargeReportsLock, ref lockTaken);
}
private void ReleaseProcessingLargeReportsLock(ref bool lockTaken)
{
if (!lockTaken) return;
Monitor.Exit(this.processingLargeReportsLock);
lockTaken = false;
}
public bool GenerateReport(EstimatedReportSize reportSize)
{
bool lockTaken = false;
try
{
this.AcquireProcessingLargeReportsLock(ref lockTaken);
bool allowLargeReports = (this.processingLargeReports < Settings.Default.LargeReportLimit);
if (!allowLargeReports)
{
this.ReleaseProcessingLargeReportsLock(ref lockTaken);
}
var currentDateTime = DateTimeFactory.Instance.DateTimeNow;
var reportOrderNextInQueue = this.ReportOrderLogic.GetNextReportOrderAndLock(
currentDateTime.AddHours(
this.timeoutValueInHoursBeforeReleaseLock),
reportSize,
CorrelationIdForPickingReport,
allowLargeReports);
if (reportOrderNextInQueue.IsProcessing)
{
this.processingLargeReports++;
this.ReleaseProcessingLargeReportsLock(ref lockTaken);
}
var currentReport = this.GetReportToBeWorked(reportOrderNextInQueue);
var works = this.WorkTheReport(reportOrderNextInQueue, currentReport, currentDateTime);
if (reportOrderNextInQueue.IsProcessing)
{
this.AcquireProcessingLargeReportsLock(ref lockTaken);
this.processingLargeReports--;
}
return works;
}
finally
{
this.ReleaseProcessingLargeReportsLock(ref lockTaken);
}
}