使用4.0框架类和block Collection在c#中实现生产者/混合消费者

本文关键字:实现 生产者 消费者 混合 Collection 框架 block 使用 | 更新日期: 2023-09-27 17:50:14

我有一个生产者/消费者场景。生产者永远不会停止,这意味着即使在BC中有一段时间没有项目,以后也可以添加更多的项目。

从。net Framework 3.5到4.0,我决定使用BlockingCollection作为消费者和生产者之间的并发队列。我甚至添加了一些并行扩展,以便我可以将BC与Parallel.ForEach一起使用。

问题是,在消费者线程中,我需要一种混合模型:

  1. 我总是检查BC来处理任何带有Parallel.ForEach(bc.GetConsumingEnumerable(), item => etc
  2. 在这个foreach中,我执行所有不依赖于彼此的任务。
  3. 问题来了。在并行化之前的任务后,我需要以相同的FIFO顺序管理它们在BC中的结果。这些结果的处理应该在同步线程中进行。

下面是一个伪代码的小例子:

制作人:

//This event is triggered each time a page is scanned. Any batch of new pages can be added at any time at the scanner
private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{          
     //The object to add has a property with the sequence number
    _concurrentCollection.TryAdd(scannedPage);
}
消费者:

private void Init()
{
    _cancelTasks = false;
    _checkTask = Task.Factory.StartNew(() =>
            {
                while (!_cancelTasks)
                {
                    //BlockingCollections with Parallel ForEach
                    var bc = _concurrentCollection;
                    Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
                    {
                        ScannedPage currentPage = item;
                        // process a batch of images from the bc and check if an image has a valid barcode. T
                    });
                    //Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.
                }
            });
}
            

显然,这不能工作,因为.GetConsumingEnumerable()阻塞,直到BC中有另一个项目。我想我可以用任务来做,在同一个批处理中只执行4或5个任务,但是:

  1. 我怎么能对任务这样做,并且在任务开始之前仍然有一个等待点,直到在BC中有一个项目被消耗(我不想在没有任何东西的情况下开始处理)。一旦在BC中有东西,我就会开始4个任务的批处理,并在每个任务中使用TryTake,所以如果没有什么可采取的,它们就不会阻塞,因为我不知道我是否总能达到从BC作为批任务的项目数量,例如,在BC中只剩下一个项目和一批4个任务)?
  2. 我怎么才能做到这一点,并利用并行的效率。提供吗?
  3. 我如何保存任务的结果在相同的FIFO顺序,其中项目是从BC中提取的?
  4. 有没有其他的并发类更适合于在消费者中对项目进行这种混合处理?
  5. 此外,这是我在StackOverflow中提出的第一个问题,所以如果你需要更多的数据,或者你只是认为我的问题不正确,请告诉我。

使用4.0框架类和block Collection在c#中实现生产者/混合消费者

我想我遵循你的问题,为什么不创建一个ConcurrentBag并添加到它,而处理像这样:

while (!_cancelTasks)
{
   //BlockingCollections with Paralell ForEach
   var bc = _concurrentCollection;
   var q = new ConcurrentBag<ScannedPage>();
   Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
   {
      ScannedPage currentPage = item;
      q.Add(item);
      // process a batch of images from the bc and check if an image has a valid barcode. T
   });
 //Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.

  //process items in your list here by sorting using some sequence key
  var items = q.OrderBy( o=> o.SeqNbr).ToList();
  foreach( var item in items){
     ...
  }
}

这显然不会按照添加到BC的确切顺序将它们排队,但是您可以像Alex建议的那样向ScannedPage对象添加一些序列nbr,然后对结果进行排序。

我是这样处理这个序列的:

将此添加到ScannedPage类:

public static int _counter;  //public because this is just an example but it would work.

获取序列nbr并在这里赋值:

private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{          
    lock( this){   //to single thread this process.. not necessary if it's already single threaded of course.
    System.Threading.Interlocked.Increment( ref ScannedPage._counter);
    scannedPage.SeqNbr = ScannedPage._counter;
    ...
    }
}

当您需要并行操作的结果时,使用PLINQ通常比使用Parallel类更方便。以下是使用PLINQ重构代码的方法:

private void Init()
{
    _cancelTasks = new CancellationTokenSource();
    _checkTask = Task.Run(() =>
    {
        while (true)
        {
            _cancelTasks.Token.ThrowIfCancellationRequested();
            var bc = _concurrentCollection;
            var partitioner = Partitioner.Create(
                bc.GetConsumingEnumerable(_cancelTasks.Token),
                EnumerablePartitionerOptions.NoBuffering);
            ScannedPage[] results = partitioner
                .AsParallel()
                .AsOrdered()
                .Select(scannedPage =>
                {
                    // Process the scannedPage
                    return scannedPage;
                })
                .ToArray();
            // Process the results
        }
    });
}

.AsOrdered()确保您将获得与输入相同顺序的结果。

请注意,当您使用Parallel类或PLINQ使用BlockingCollection<T>时,重要的是使用PartitionerEnumerablePartitionerOptions.NoBuffering配置,否则会有死锁的风险。Parallel/PLINQ的默认贪心行为和BlockingCollection<T>的阻塞行为不能很好地交互