线程安全可重入队列使用peek
本文关键字:peek 队列 安全 线程 | 更新日期: 2023-09-27 18:02:40
我的基本问题是,如果队列为空,需要立即处理队列中的项目,或者向队列中添加一个项目,如果已经有一个项目正在处理,则离开。
我正在尝试一种使用peek来简化事情的技术,我想知道可能会有什么障碍。谢谢!
void SequenceAction(Action action) {
bool go = false;
lock (_RaiseEventQueueLock) {
_RaiseEventQueue.Enqueue(action);
go = (_RaiseEventQueue.Count == 1);
}
// 'go' can only be true if queue was empty when queue
// was locked and item was enqueued.
while (go) {
#if naive_threadsafe_presumption
// Peek is threadsafe because in effect this loop owns
// the zeroeth item in the queue for as long as the queue
// remains non-empty.
(_RaiseEventQueue.Peek())();
#else
Action a;
lock (_RaiseEventQueueLock) {
a = _RaiseEventQueue.Peek();
}
a();
#endif
// Lock the queue to see if any item was enqueued while
// the zeroeth item was being processed.
// Note that the code processing an item can call this
// function reentrantly, adding to its own todo list
// while insuring that that each item is processed
// to completion.
lock (_RaiseEventQueueLock) {
_RaiseEventQueue.Dequeue();
go = (_RaiseEventQueue.Count > 0);
}
}
}
实际上,您的Peek
不是线程安全的。向队列中添加一个项目可能会导致后备存储(最终是一个数组)的大小被调整。我想象队列是在一个循环缓冲区中实现的,有头和尾索引用于插入和删除。
那么想象一下,如果队列中有16个项目会发生什么。插入是在8,删除是在9。队列已经满了。然后发生以下情况:
- 线程A调用Peek,获取Remove索引(9)。
- 线程A被换出。
- 线程B调用Enqueue并看到它必须增加队列。 线程B分配一个包含32个项目的新数组,并从现有数组中复制数据。数据按顺序复制,从Remove开始并环绕。
- 线程B设置Remove为0,Insert为16。
- 线程A获得下一个时间片并返回位置9的项目。
- 您刚刚按顺序处理了一个事件,您最终将重新处理它。
- 更糟糕的是,您将删除位置0的项目而不处理它。
你可能可以解决这个问题:
Action nextAction;
lock (_RaiseEventQueueLock)
{
nextAction = _RaiseEventQueue.Peek();
}
nextAction();
但我不会拿我的职业生涯来赌。我建议使用BlockingCollection和生产者/消费者设计。
可能修复
我想到下面的方法应该能达到你的目的。
private readonly object _queueLock = new object();
private readonly object _processLock = new object();
void SequenceAction(Action action)
{
lock (_queueLock)
{
_RaiseEventQueue.Enqueue(action);
}
if (Monitor.TryEnter(_processLock))
{
while (true)
{
Action a;
lock (_queueLock)
{
if (_RaiseEventQueue.Count == 0) return;
a = _RaiseEventQueue.Dequeue();
}
a();
}
Monitor.Exit(_processLock);
}
}
// If action already in progress, add new
// action to queue and return.
// If no action in progress, begin processing
// the new action and continue processing
// actions added to the queue in the meantime.
void SequenceAction(Action action) {
lock (_SequenceActionQueueLock) {
_SequenceActionQueue.Enqueue(action);
if (_SequenceActionQueue.Count > 1) {
return;
}
}
// Would have returned if queue was not empty
// when queue was locked and item was enqueued.
for (;;) {
action();
lock (_SequenceActionQueueLock) {
_SequenceActionQueue.Dequeue();
if (_SequenceActionQueue.Count == 0) {
return;
}
action = _SequenceActionQueue.Peek();
}
}
}