ConcurrentBag的正确用法是什么?

本文关键字:是什么 用法 ConcurrentBag | 更新日期: 2023-09-27 18:04:55

我已经阅读了之前关于ConcurrentBag的问题,但没有找到多线程实现的实际示例。

ConcurrentBag是一个线程安全的包实现,针对同一线程将生产和消费存储在包中的数据的场景进行了优化。"

目前这是我代码中的当前用法(这是简化的,而不是实际的代码):

private void MyMethod()
{
    List<Product> products = GetAllProducts(); // Get list of products
    ConcurrentBag<Product> myBag = new ConcurrentBag<Product>();
    //products were simply added here in the ConcurrentBag to simplify the code
    //actual code process each product before adding in the bag
    Parallel.ForEach(
                products,
                new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount },
                product => myBag.Add(product));
    ProcessBag(myBag); // method to process each items in the concurrentbag
}

我的问题:这是ConcurrentBag的正确用法吗?在这种情况下可以使用ConcurrentBag吗?

对我来说,我认为一个简单的List<Product>和手动锁会做得更好。这样做的原因是,上面的场景已经打破了"同一个线程将生产和消费存储在包中的数据"规则。此外,我还发现,在并行的每个线程中创建的ThreadLocal存储在操作后仍然存在(即使线程被重用,这是对的吗?),这可能会导致不希望的内存泄漏。我说的对吗,伙计们?或者一个简单的明确或空的方法来删除项目在ConcurrentBag是足够的?

ConcurrentBag的正确用法是什么?

这看起来是对ConcurrentBag的一个很好的使用。线程局部变量是包的成员,在包被回收的同时,它们也有资格被垃圾收集(清除内容不会释放它们)。您是对的,一个带锁的简单List就足以满足您的情况。如果在循环中所做的工作非常重要,那么线程同步的类型对整体性能影响不大。在这种情况下,您可能更愿意使用您熟悉的内容。

另一个选择是使用ParallelEnumerable。选择,它与您想要做的事情更接近。同样,您将看到的任何性能差异可能都可以忽略不计,坚持使用您所知道的并没有什么错。

一如既往,如果它的性能是关键的,没有什么可以替代尝试和测量。

在我看来bmm60 's是不正确的。ConcurrentBag实例内部包含每个向其添加项目的线程的迷你包,因此项目插入不涉及任何线程锁,因此所有Environment.ProcessorCount线程都可以进入full - swing,而不需要等待,也不需要任何线程上下文切换。在迭代收集的项时可能需要线程同步,但是在原始示例中,迭代是在所有插入完成后由单个线程完成的。此外,如果ConcurrentBag使用互锁技术作为线程同步的第一层,那么可能根本不涉及Monitor操作。

另一方面,使用通常的List<T>实例并使用lock关键字包装其Add()方法调用将严重损害性能。首先,由于Monitor.Enter()Monitor.Exit()调用是恒定的,每个调用都需要深入到内核模式并使用Windows同步原语。其次,有时候一个线程偶尔会被第二个线程阻塞,因为第二个线程还没有完成它的添加。

对我来说,上面的代码是正确使用ConcurrentBag类的一个很好的例子。

这是ConcurrentBag的正确用法吗?在这种情况下可以使用ConcurrentBag吗?

没有,原因有很多:

  1. 这不是这个集合的预期使用场景。ConcurrentBag<T>用于混合生产者-消费者场景,这意味着每个线程都希望添加从包中取出物品。你的场景完全不是这样的。您有许多添加项的线程,而没有获取项的线程。ConcurrentBag<T>的主要应用是创建对象池(创建或销毁可重用对象的池)。考虑到Microsoft.Extensions.ObjectPool包中ObjectPool<T>类的可用性,甚至这个适合这个集合的小众应用程序也存在争议。
  2. 不保留插入顺序。即使保持插入顺序并不重要,得到一个混乱的输出也会使调试更加困难。
  3. 它创建必须由GC收集的垃圾。它为每个线程创建一个WorkStealingQueue(内部类),每个类包含一个可扩展数组,因此线程越多,分配的对象就越多。此外,每次枚举时,它都会复制一个新数组中的所有项,并在每个foreach上显示一个IEnumerator<T> GetEnumerator()属性。
  4. 有更好的选择,提供更好的性能和更好的订购行为。

在您的场景中,您可以将并行执行的结果存储在一个简单的数组中。只需创建一个长度等于products.Count的数组,从Parallel.ForEach切换到Parallel.For,并将结果直接分配给results数组的相应槽位,而根本不做任何同步:

List<Product> products = GetAllProducts(); // Get list of products
Product[] results = Product[products.Count];
Parallel.For(0, products.Count,
    new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount },
    i => results[i] = products[i]);
ProcessResults(results);

这样你将得到完美排序的结果,存储在一个容器中,它具有最紧凑的大小和所有。net集合中最快的枚举,只做一个对象分配。

如果您担心上述操作的线程安全性,则无需担心。每个线程在results数组中的不同槽上写入。在并行执行完成后,当前线程可以完全看到存储在数组中的所有值,因为TPL在任务排队时以及任务执行的开始/结束时包含了适当的barrier(引用)。

(我在这个答案中发布了更多关于ConcurrentBag<T>的想法)

如果List<T>Add()方法周围的锁一起使用,它将使线程等待,并将降低使用Parallel.ForEach()的性能增益