.Net 4.0中的多生产者/消费者交互
本文关键字:生产者 消费者 交互 Net | 更新日期: 2023-09-27 18:23:58
我正在使用BlockingCollection处理一些文件,然后将它们上传到服务器。
现在我有一个Producer,它可以递归文件系统并将某些文件压缩到临时位置。完成一个文件后,它会将我自己的对象添加到BlockingCollection中,其中包含信息,如文件名、文件路径、修改日期等。然后消费者抓住这个对象并使用它上传文件。当生产者完成对文件系统的搜索和对文件的处理后,它会调用BlockingCollection.CompleteAdding()方法向消费者发出它已经完成的信号。
我想做的是将生产者的数量增加到2个或更多。原因是压缩过程需要一段时间,而在多核处理器上,我只使用1核。这导致生产者有时在更快的网络上落后于消费者。
我的问题是,当我有多个生产者而只有一个消费者时,我如何向消费者发出所有生产者都完成了工作的信号?如果我在其中一个生产者上调用BlockingCollection.CompleteAdding()方法,我仍然可以有一个或多个其他生产者在工作。
在调用BlockingCollection.CompleteAdding()
之前,可以在Producer
代码中使用信号量。信号量由所有Producer
实例共享,当最后一个生产者完成时,它可以调用该方法。信号量可以实现为一个简单的计数器,在创建生产者时递增计数器,在生产者结束其作业时递减计数器。如果计数器达到零,则可以调用BlockingCollection.CompleteAdding()
。
我使用这样的东西来拥有多个生产者和消费者。这只是一个非常简单的解决方案,没有针对生产代码进行优化。
public class ManageBatchProcessing
{
private BlockingCollection<Action> blockingCollection;
public void Process()
{
blockingCollection = new BlockingCollection<Action>();
int numberOfBatches = 10;
Process(HandleProducers, HandleConsumers, numberOfBatches);
}
private void Process(Action<int> produce, Action<int> consume, int numberOfBatches)
{
produce(numberOfBatches);
consume(numberOfBatches);
}
private void HandleConsumers(int numberOfBatches)
{
var consumers = new List<Task>();
for (var i = 1; i <= numberOfBatches; i++)
{
consumers.Add(Task.Factory.StartNew(() =>
{
foreach (var action in blockingCollection.GetConsumingEnumerable())
{
action();
}
}));
}
Task.WaitAll(consumers.ToArray());
}
private void HandleProducers(int numberOfBatches)
{
var producers = new List<Task>();
for (var i = 0; i <= numberOfBatches; i++)
{
producers.Add(Task.Factory.StartNew(() =>
{
blockingCollection.Add(() => YourProdcerMethod());
}));
}
Task.WaitAll(producers.ToArray());
blockingCollection.CompleteAdding();
}
}