.Net 4.0中的多生产者/消费者交互

本文关键字:生产者 消费者 交互 Net | 更新日期: 2023-09-27 18:23:58

我正在使用BlockingCollection处理一些文件,然后将它们上传到服务器。

现在我有一个Producer,它可以递归文件系统并将某些文件压缩到临时位置。完成一个文件后,它会将我自己的对象添加到BlockingCollection中,其中包含信息,如文件名、文件路径、修改日期等。然后消费者抓住这个对象并使用它上传文件。当生产者完成对文件系统的搜索和对文件的处理后,它会调用BlockingCollection.CompleteAdding()方法向消费者发出它已经完成的信号。

我想做的是将生产者的数量增加到2个或更多。原因是压缩过程需要一段时间,而在多核处理器上,我只使用1核。这导致生产者有时在更快的网络上落后于消费者。

我的问题是,当我有多个生产者而只有一个消费者时,我如何向消费者发出所有生产者都完成了工作的信号?如果我在其中一个生产者上调用BlockingCollection.CompleteAdding()方法,我仍然可以有一个或多个其他生产者在工作。

.Net 4.0中的多生产者/消费者交互

在调用BlockingCollection.CompleteAdding()之前,可以在Producer代码中使用信号量。信号量由所有Producer实例共享,当最后一个生产者完成时,它可以调用该方法。信号量可以实现为一个简单的计数器,在创建生产者时递增计数器,在生产者结束其作业时递减计数器。如果计数器达到零,则可以调用BlockingCollection.CompleteAdding()

我使用这样的东西来拥有多个生产者和消费者。这只是一个非常简单的解决方案,没有针对生产代码进行优化。

public class ManageBatchProcessing 
{
    private  BlockingCollection<Action> blockingCollection;
    public void Process()
    {
        blockingCollection = new BlockingCollection<Action>();
        int numberOfBatches = 10;
        Process(HandleProducers, HandleConsumers, numberOfBatches);
    }
    private void Process(Action<int> produce, Action<int> consume, int numberOfBatches)
    {
        produce(numberOfBatches);
        consume(numberOfBatches);
    }
    private void HandleConsumers(int numberOfBatches)
    {
        var consumers = new List<Task>();
        for (var i = 1; i <= numberOfBatches; i++)
        {
            consumers.Add(Task.Factory.StartNew(() =>
            {
                foreach (var action in blockingCollection.GetConsumingEnumerable())
                {
                    action();
                }
            }));
        }
        Task.WaitAll(consumers.ToArray());
    }
    private void HandleProducers(int numberOfBatches)
    {
        var producers = new List<Task>();
        for (var i = 0; i <= numberOfBatches; i++)
        {
            producers.Add(Task.Factory.StartNew(() =>
            {
                blockingCollection.Add(() => YourProdcerMethod());
            }));
        }
        Task.WaitAll(producers.ToArray());
        blockingCollection.CompleteAdding();
    }
}