什么时候使用BlockingCollection,什么时候使用ConcurrentBag代替List<

本文关键字:什么时候 List 代替 BlockingCollection ConcurrentBag | 更新日期: 2023-09-27 18:03:43

对于"为什么这是平行的"这个问题的公认答案。每个代码都冻结程序?建议在WPF应用程序中用ConcurrentBag代替List的用法。

我想了解是否可以在这种情况下使用BlockingCollection ?

什么时候使用BlockingCollection,什么时候使用ConcurrentBag代替List<

您确实可以使用BlockingCollection,但这样做绝对没有意义。

首先,请注意BlockingCollection是实现IProducerConsumerCollection<T>的集合的包装器。任何实现该接口的类型都可以用作底层存储:

创建BlockingCollection<T>对象时,可以指定not只有有限的容量,还有要使用的集合类型。为例如,您可以为first in指定一个ConcurrentQueue<T>对象,先进先出(FIFO)行为,或者最后一个ConcurrentStack<T>对象进先出(LIFO)行为。您可以使用任何集合类实现IProducerConsumerCollection<T>接口。默认的BlockingCollection<T>的集合类型为ConcurrentQueue<T>

这包括ConcurrentBag<T>,这意味着您可以有一个阻塞并发包。那么,普通IProducerConsumerCollection<T>和阻塞集合之间的区别是什么呢?BlockingCollection的文档说(强调我的):

BlockingCollection<T>用作一个包装器IProducerConsumerCollection<T> instance, 允许删除尝试从集合到块,直到有可用的数据被删除。类似地,可以创建BlockingCollection<T>强制对象中允许的数据元素数的上限IProducerConsumerCollection<T>[…]

因为在链接的问题中没有必要做这些事情,使用BlockingCollection只是添加了一个未使用的功能层。

  • List<T>是一个设计用于单线程的集合应用。

  • ConcurrentBag<T>是一类Collections.Concurrent命名空间设计的简化在多线程环境中使用集合。如果你使用ConcurrentCollection,您将不必锁定您的收集,以防止其他线程损坏。你可以插入或者从集合中获取数据,而不需要编写特殊的锁定代码。

  • BlockingCollection<T>的设计是为了消除检查线程间共享集合中是否有新数据的需求。如果有新数据插入到共享集合中,那么消费者线程将立即唤醒。因此,通常在while循环中,您不必在特定的时间间隔内检查消费者线程是否有新数据可用。

当您发现需要一个线程安全的List<T>时,在大多数情况下,ConcurrentBag<T>BlockingCollection<T>都不是您的最佳选择。这两个集合都专门用于促进生产者-消费者场景,因此,除非您有多个线程并发地添加从集合中删除项,否则您应该寻找其他选项(在大多数情况下,最佳候选是ConcurrentQueue<T>)。

特别是关于ConcurrentBag<T>,它是一个非常专门的类,针对混合生产者-消费者场景。这意味着每个工作线程都被期望同时是生产者和消费者(添加从同一集合中删除项目)。可能是ObjectPool类内部存储的一个很好的候选,但除此之外,很难想象这个类有什么有利的使用场景。

人们通常认为ConcurrentBag<T>是线程安全的List<T>的等价物,但它不是。这两个api的相似之处令人产生误解。调用AddList<T>的结果是在列表的末尾添加一个项目。将Add调用到ConcurrentBag<T>的结果是将物品添加到包内的随机插槽中。ConcurrentBag<T>基本上是无序的。它没有针对枚举进行优化,并且在命令它这样做时做得很糟糕。它在内部维护一堆线程本地队列,因此其内容的顺序由哪个线程做了什么决定,而不是由什么时候发生了什么决定。在每次枚举ConcurrentBag<T>之前,所有这些线程本地队列都被复制到一个数组中,这给垃圾收集器增加了压力(源代码)。因此,例如var item = bag.First();行会产生整个集合的副本,因为只返回一个元素。

这些特性使得ConcurrentBag<T>不是存储Parallel.For/Parallel.ForEach循环结果的理想选择。

ConcurrentQueue<T>.Enqueue方法是List<T>.Add的一个更好的线程安全替代品。"Enqueue"是一个比"Add"不太熟悉的词,但它实际上做了您期望它做的事情。

没有ConcurrentBag<T>能做ConcurrentQueue<T>不能做的事。例如,两个集合都不提供从集合中删除特定项的方法。如果您想要一个具有key参数的TryRemove方法的并发集合,您可以查看ConcurrentDictionary<K,V>类。

ConcurrentBag<T>经常出现在微软文档中与任务并行库相关的示例中。比如这里。不管是谁写的文档,显然他们更看重编写Add而不是Enqueue的微小可用性优势,而不是使用错误集合的行为/性能劣势。考虑到这些示例是在TPL刚刚出现的时候编写的,并且目标是让大多数不熟悉并行编程的开发人员快速采用该库,这在一定程度上是有道理的。我明白了,Enqueue这个词在你第一次看到的时候是很吓人的。不幸的是,现在有整整一代开发人员已经将ConcurrentBag<T>纳入他们的思维工具中,尽管考虑到这一点,它并不存在这个集合有多专门化。

如果您想以与源元素完全相同的顺序收集Parallel.ForEach循环的结果,您可以使用lock保护的List<T>。在大多数情况下,开销可以忽略不计,特别是在循环内部的工作很繁重的情况下。下面是一个示例,使用Select LINQ操作符获取每个元素的索引。

var indexedSource = source.Select((item, index) => (item, index));
List<TResult> results = new();
Parallel.ForEach(indexedSource, parallelOptions, entry =>
{
    var (item, index) = entry;
    TResult result = GetResult(item);
    lock (results)
    {
        while (results.Count <= index) results.Add(default);
        results[index] = result;
    }
});

这是针对source是一个未知大小的延迟序列的情况。如果事先知道它的大小,就更简单了。只需预先分配一个TResult[]数组,并在不加锁的情况下并行更新它:

TResult[] results = new TResult[source.Count];
Parallel.For(0, source.Count, parallelOptions, i =>
{
    results[i] = GetResult(source[i]);
});

TPL在任务执行结束时包含内存屏障,因此results数组的所有值将在当前线程中可见(引用)。

是的,您可以使用BlockingCollectionfinishedProxies定义为:

BlockingCollection<string> finishedProxies = new BlockingCollection<string>();

,要添加一项,你可以这样写:

finishedProxies.Add(checkResult);

完成后,您可以根据内容创建一个列表