线程安全可重入队列使用peek

本文关键字:peek 队列 安全 线程 | 更新日期: 2023-09-27 18:02:40

我的基本问题是,如果队列为空,需要立即处理队列中的项目,或者向队列中添加一个项目,如果已经有一个项目正在处理,则离开。

我正在尝试一种使用peek来简化事情的技术,我想知道可能会有什么障碍。谢谢!

    void SequenceAction(Action action) {
       bool go = false;
       lock (_RaiseEventQueueLock) {
          _RaiseEventQueue.Enqueue(action);
          go = (_RaiseEventQueue.Count == 1); 
       }
       // 'go' can only be true if queue was empty when queue 
       //  was locked and item was enqueued.
       while (go) {
          #if naive_threadsafe_presumption 
          // Peek is threadsafe because in effect this loop owns
          //  the zeroeth item in the queue for as long as the queue 
          //  remains non-empty.
          (_RaiseEventQueue.Peek())();
          #else
          Action a;
          lock (_RaiseEventQueueLock) {
             a = _RaiseEventQueue.Peek();
          }
          a();
          #endif   
          // Lock the queue to see if any item was enqueued while
          //  the zeroeth item was being processed.
          // Note that the code processing an item can call this 
          //  function reentrantly, adding to its own todo list
          //  while insuring that that each item is processed 
          //  to completion.
          lock (_RaiseEventQueueLock) {
             _RaiseEventQueue.Dequeue();
             go = (_RaiseEventQueue.Count > 0); 
          }
       }
    }

线程安全可重入队列使用peek

实际上,您的Peek不是线程安全的。向队列中添加一个项目可能会导致后备存储(最终是一个数组)的大小被调整。我想象队列是在一个循环缓冲区中实现的,有头和尾索引用于插入和删除。

那么想象一下,如果队列中有16个项目会发生什么。插入是在8,删除是在9。队列已经满了。然后发生以下情况:

  1. 线程A调用Peek,获取Remove索引(9)。
  2. 线程A被换出。
  3. 线程B调用Enqueue并看到它必须增加队列。
  4. 线程B分配一个包含32个项目的新数组,并从现有数组中复制数据。数据按顺序复制,从Remove开始并环绕。
  5. 线程B设置Remove为0,Insert为16。
  6. 线程A获得下一个时间片并返回位置9的项目。
  7. 您刚刚按顺序处理了一个事件,您最终将重新处理它。
  8. 更糟糕的是,您将删除位置0的项目而不处理它。

可能可以解决这个问题:

Action nextAction;
lock (_RaiseEventQueueLock)
{
    nextAction = _RaiseEventQueue.Peek();
}
nextAction();
但我不会拿我的职业生涯来赌。我建议使用BlockingCollection和生产者/消费者设计。

可能修复

我想到下面的方法应该能达到你的目的。

private readonly object _queueLock = new object();
private readonly object _processLock = new object();
void SequenceAction(Action action)
{
    lock (_queueLock)
    {
        _RaiseEventQueue.Enqueue(action);
    }
    if (Monitor.TryEnter(_processLock))
    {
        while (true)
        {
            Action a;
            lock (_queueLock)
            {
                if (_RaiseEventQueue.Count == 0) return;
                a = _RaiseEventQueue.Dequeue();
            }
            a();
        }
        Monitor.Exit(_processLock);
    }
}
    // If action already in progress, add new
    //  action to queue and return.
    // If no action in progress, begin processing
    //  the new action and continue processing
    //  actions added to the queue in the meantime.
    void SequenceAction(Action action) {
       lock (_SequenceActionQueueLock) {
          _SequenceActionQueue.Enqueue(action);
          if (_SequenceActionQueue.Count > 1) {
             return;
          }
       }
       // Would have returned if queue was not empty
       //  when queue was locked and item was enqueued.
       for (;;) {
          action();
          lock (_SequenceActionQueueLock) {
             _SequenceActionQueue.Dequeue();
             if (_SequenceActionQueue.Count == 0) {
                return;
             }
             action = _SequenceActionQueue.Peek();
          }
       }
    }