当生产者也是使用者时,如何在生产者/使用者模式中使用阻塞集合 - 如何结束

本文关键字:使用者 生产者 集合 何结束 结束 模式 | 更新日期: 2023-09-27 18:35:56

我有一个递归问题,消费者在树的每个级别做一些工作,然后需要递归树并在下一个级别执行相同的工作。

我想使用 ConcurrentBag/BlockingCollection 等并行运行它。在这种情况下,队列的使用者也是队列的生产者!

我的问题是这样的:使用 BlockingCollection ,我可以编写非常简单的foreach逻辑来取消排队项目,并对新项目进行排队 - 当队列为空时,阻塞集合将正确阻塞,并等待其他消费者之一产生新工作。

但是我怎么知道是否所有的消费者都在阻止?!

我知道CompleteAdding(),但这似乎不起作用,因为您真正完成的唯一时间是当所有制作人都完成生产并且队列为空时 - 并且由于它们都会阻塞,因此没有人"自由"设置CompleteAdding()。有没有办法检测到这一点?(也许是一个可以在阻止时触发,并在解锁时再次触发的事件?

我可以手动处理这个问题,不使用foreach,而是手动使用while(!complete)循环并使用TryTake,但随后我需要手动休眠,这似乎效率低下(首先拥有阻塞集合与仅并发集合的全部原因!每次通过循环时,如果TryTake为 false,我可以设置一个 Idle 标志,然后让 Master 检查队列是否为空,并且所有线程都空闲,设置一个完整的标志,但同样,这似乎很笨拙。

直觉告诉我有一些方法可以使用BlockingCollection来做到这一点,但我不能完全到达那里。

无论如何,任何人都有一个很好的模式,当消费者是生产者,并且能够检测到何时释放所有区块将是非常棒的。

当生产者也是使用者时,如何在生产者/使用者模式中使用阻塞集合 - 如何结束

这是一个类似于 BlockingCollection<T> 的集合的低级实现,不同之处在于它会自动完成,而不是依赖于手动调用 CompleteAdding 方法。自动完成的条件是集合为空,并且所有使用者都处于等待状态。

/// <summary>
/// A blocking collection that completes automatically when it's empty and all
/// consuming enumerables are in a waiting state.
/// </summary>
public class AutoCompleteBlockingCollection<T>
{
    private readonly Queue<T> _queue = new Queue<T>();
    private int _consumersCount = 0;
    private int _waitingConsumers = 0;
    private bool _autoCompleteStarted;
    private bool _completed;
    public int Count { get { lock (_queue) return _queue.Count; } }
    public bool IsCompleted => Volatile.Read(ref _completed);
    public void Add(T item)
    {
        lock (_queue)
        {
            if (_completed) throw new InvalidOperationException(
                "The collection has completed.");
            _queue.Enqueue(item);
            Monitor.Pulse(_queue);
        }
    }
    /// <summary>
    /// Begin observing the condition for automatic completion.
    /// </summary>
    public void BeginObservingAutoComplete()
    {
        lock (_queue)
        {
            if (_autoCompleteStarted) return;
            _autoCompleteStarted = true;
            Monitor.PulseAll(_queue);
        }
    }
    public IEnumerable<T> GetConsumingEnumerable()
    {
        bool waiting = false;
        lock (_queue) _consumersCount++;
        try
        {
            while (true)
            {
                T item;
                lock (_queue)
                {
                    if (_completed) yield break;
                    while (_queue.Count == 0)
                    {
                        if (_autoCompleteStarted &&
                            _waitingConsumers == _consumersCount - 1)
                        {
                            _completed = true;
                            Monitor.PulseAll(_queue);
                            yield break;
                        }
                        waiting = true; _waitingConsumers++;
                        Monitor.Wait(_queue);
                        waiting = false; _waitingConsumers--;
                        if (_completed) yield break;
                    }
                    item = _queue.Dequeue();
                }
                yield return item;
            }
        }
        finally
        {
            lock (_queue)
            {
                _consumersCount--;
                if (waiting) _waitingConsumers--;
                if (!_completed && _autoCompleteStarted &&
                    _waitingConsumers == _consumersCount)
                {
                    _completed = true;
                    Monitor.PulseAll(_queue);
                }
            }
        }
    }
}

AutoCompleteBlockingCollection<T>仅提供BlockingCollection<T>类的最基本功能。 不支持有限容量和取消等功能。

使用示例:

var queue = new AutoCompleteBlockingCollection<Node>();
queue.Add(rootNode);
queue.BeginObservingAutoComplete();
Task[] workers = Enumerable.Range(1, 4).Select(_ => Task.Run(() =>
{
    foreach (Node node in queue.GetConsumingEnumerable())
    {
        Process(node);
        foreach (Node child in node.Children)
            queue.Add(child);
    }
})).ToArray();
await Task.WhenAll(workers);

应在集合中添加初始项后调用 BeginObservingAutoComplete 方法。在调用此方法之前,不会检查自动完成条件。在上面的示例中,在开始观察自动完成条件之前只添加一个项目。然后启动四个工作线程,每个工作线程使用集合,处理每个使用的节点,然后将此节点的子节点添加到集合中。最终,树的所有节点都将被消耗,最后一个活动工作线程将触发收集的自动完成。这将允许所有工作线程退出消费循环并完成。

支持随时(动态)添加和删除使用者。该集合是线程安全的。

可以在此处找到上述集合的功能丰富但效率较低的实现。