如何使用blockingcollection解决生产者/消费者竞争条件
本文关键字:消费者 竞争 条件 生产者 何使用 blockingcollection 解决 | 更新日期: 2023-09-27 18:16:53
我正在实现一个记录器,它将记录写入数据库。为了防止数据库写入阻塞调用日志记录器的代码,我将数据库访问移动到一个单独的线程,使用基于BlockingCollection<string>
的生产者/消费者模型实现。
下面是简化的实现:
abstract class DbLogger : TraceListener
{
private readonly BlockingCollection<string> _buffer;
private readonly Task _writerTask;
DbLogger()
{
this._buffer = new BlockingCollection<string>(new ConcurrentQueue<string>(), 1000);
this._writerTask = Task.Factory.StartNew(this.ProcessBuffer, TaskCreationOptions.LongRunning);
}
// Enqueue the msg.
public void LogMessage(string msg) { this._buffer.Add(msg); }
private void ProcessBuffer()
{
foreach (string msg in this._buffer.GetConsumingEnumerable())
{
this.WriteToDb(msg);
}
}
protected abstract void WriteToDb(string msg);
protected override void Dispose(bool disposing)
{
if (disposing)
{
// Signal to the blocking collection that the enumerator is done.
this._buffer.CompleteAdding();
// Wait for any in-progress writes to finish.
this._writerTask.Wait(timeout);
this._buffer.Dispose();
}
base.Dispose(disposing);
}
}
现在,当我的应用程序关闭时,我需要确保缓冲区在数据库连接关闭之前被刷新。否则,WriteToDb
将抛出异常。
那么,这是我的朴素Flush实现:
public void Flush()
{
// Sleep until the buffer is empty.
while(this._buffer.Count > 0)
{
Thread.Sleep(50);
}
}
这个实现的问题是以下事件序列:
- 缓冲区中有一个条目。
- 在日志线程中,
MoveNext()
在枚举器上被调用,所以我们现在在ProcessBuffer
的foreach
循环体中。 -
Flush()
被主线程调用。它看到集合为空,因此立即返回。 - 主线程关闭数据库连接 回到日志线程中,
foreach
循环开始执行。WriteToDb
被调用,并且由于DB连接已关闭而失败。private volatile bool _isWritingBuffer = false;
private void ProcessBuffer()
{
foreach (string msg in this._buffer.GetConsumingEnumerable())
{
lock (something) this._isWritingBuffer = true;
this.WriteToDb(msg);
lock (something) this._isWritingBuffer = false;
}
}
public void Flush()
{
// Sleep until the buffer is empty.
bool isWritingBuffer;
lock(something) isWritingBuffer = this._isWritingBuffer;
while(this._buffer.Count > 0 || isWritingBuffer)
{
Thread.Sleep(50);
}
}
然而,仍然存在一个竞争条件,因为整个Flush()
方法可以在集合为空之后,但在_isWritingBuffer
被设置为true
之前执行。
我如何修复我的Flush
实现来避免这种竞争条件?
注意:由于各种原因,我必须从头开始编写日志记录器,所以请不要建议我使用一些现有的日志记录框架。
首先永远不要锁定公共对象,尤其是this
。
此外,永远不要使用裸布尔值进行同步:如果你想了解可能出现的问题,请参阅我的博客同步,内存可见性和泄漏的抽象:)
关于问题本身,我一定是错过了一些东西,但为什么你需要这样的Flush
方法?
当你完成你的日志记录时,你将通过从主线程调用它的Dispose
方法来处理日志记录器。
并且你已经实现了这样一种方式,它将等待"写入数据库"任务。
如果我错了,你真的需要与另一个原语同步,那么你应该使用事件:
在DbLogger
:
public ManualResetEvent finalizing { get; set; }
public void Flush()
{
finalizing.WaitOne();
}
在某处,例如在ProcessBuffer
你通知当你完成写入DB:
finalizing.Set();