单个源生产者到多个并行工作的消费者
本文关键字:工作 消费者 并行 生产者 单个源 | 更新日期: 2023-09-27 18:06:36
我想要有一种队列,其中单个源在其中输入数据,另一边将有消费者等待,当他们检测到队列不为空时将开始执行数据,直到他们停止。但重要的是,如果队列被清空,它们仍将继续监视队列,这样如果有更多的数据弹出,它们将能够使用它。我发现通过多个消费者和多个生产者,因为消费者嵌套在生产者中,在我的情况下,我不能这样做,因为我将有一个单一的来源和消费者提交到队列,直到我停止它们。因此,不是串行执行,而是消费者和生产者并行执行。
将通过
并行执行消费者和生产者。Parallel.Invoke(() => producer(), () => consumers());
问题是我将如何执行队列的内容,有时是空的并行
使用BlockingCollection<T>
可以相对容易地解决这个问题。
您可以使用一个作为队列,并将对它的引用传递给producer()
和每个consumers()
。
您将从每个消费者线程调用GetConsumingEnumerable()
,并与foreach
一起使用。
生产者线程将向集合中添加物品,并在完成生产物品时调用CompleteAdding()
。这将自动使所有的消费者线程退出它们的foreach循环。
Thread.Sleep()
的调用是模拟加载,不应该在实际代码中使用。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
internal class Program
{
private static void Main(string[] args)
{
ThreadPool.SetMinThreads(10, 0); // To help the demo; not needed in real code.
var plant = new ProcessingPlant();
plant.Process();
Console.WriteLine("Work complete.");
}
}
public sealed class ProcessingPlant
{
private readonly BlockingCollection<string> _queue = new BlockingCollection<string>();
public void Process()
{
Parallel.Invoke(producer, consumers);
}
private void producer()
{
for (int i = 0; i < 100; ++i)
{
string item = i.ToString();
Console.WriteLine("Producer is queueing {0}", item);
_queue.Add(item); // <- Here's where we add an item to the queue.
Thread.Sleep(0);
}
_queue.CompleteAdding(); // <- Here's where we make all the consumers
} // exit their foreach loops.
private void consumers()
{
Parallel.Invoke(
() => consumer(1),
() => consumer(2),
() => consumer(3),
() => consumer(4),
() => consumer(5)
);
}
private void consumer(int id)
{
Console.WriteLine("Consumer {0} is starting.", id);
foreach (var item in _queue.GetConsumingEnumerable()) // <- Here's where we remove items.
{
Console.WriteLine("Consumer {0} read {1}", id, item);
Thread.Sleep(0);
}
Console.WriteLine("Consumer {0} is stopping.", id);
}
}
}
(我知道这是使用一个额外的线程来启动消费者,但我这样做是为了避免混淆真正的要点—这是为了演示BlockingCollection的使用。)