在c#中,等待阻塞集合(队列)的大小减小

本文关键字:队列 集合 等待 | 更新日期: 2023-09-27 18:02:30

我正在做一个项目,有以下工作流程:

第一部分:

  • 事件异步到达并在阻塞队列中排队,我们将其称为Q1
  • 线程从队列中获取下一个可用项目
  • 项目最终并行运行{N}个任务
  • 每个任务将其结果在第二个队列上排队,我们将其称为Q2
  • 当一个项目处理完毕时,下一个项目从队列中读取。

两部分:

  • 另一个线程每次读取一个Q2对象并处理结果

所以,这里的问题是,第一个队列上的每个项目最终并行运行大量任务,每个任务将其结果排队。第二个队列必须串行地处理,每次处理一个项目,并且它正在被淹没。


我的问题

我需要一种机制,使线程处理Q1等待,直到Q2中的项数低于特定阈值。实现这一目标的最佳方式是什么?是否有办法使用事件驱动的解决方案而不是轮询解决方案?

在c#中,等待阻塞集合(队列)的大小减小

您可以使用BlockingCollection<T>代替Queue<T>。如果设置了BoundedCapacity,则呼叫Q2.Add()将在达到容量时阻塞。这将自动限制Q1的处理,因为如果N个任务不能添加到最终队列中,它们将开始阻塞。

我假设您在偶尔的洪水中接收数据,在长时间的干旱期间Q2可以赶上。您是否考虑过通过为这些任务使用有限的线程池来简单地限制从Q1派生的并发线程的数量?

如果作业大小在到达时很容易确定,那么我怀疑您可以从多个线程池中获益。您可以使用少量线程来处理大型作业,而使用大量线程来处理小型作业。甚至第三个中间队列也可能是有益的。

你的问题似乎是一个完美的例子来解决TPL Dataflow库。如果你愿意尝试一下,下面是它的工作原理(当然这是一个非常简单的例子):

TransformBlock<int, bool> transform = new TransformBlock<int, bool>(i => i > 5 ? true : false,
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
ActionBlock<bool> resultBlock = new ActionBlock<bool>(b => Console.WriteLine("My result is : " + b),
            new ExecutionDataflowBlockOptions { BoundedCapacity = 10 });
transform.LinkTo(resultBlock);

你正在定义一个转换块,它将使你的转换(这工作作为你的Q1),你可以设置它的并行级别,你想要使用的任务的数量。

然后,您正在创建第二个块(作为您的Q2),它将设置BoundedCapacity并同步处理每个消息,为每个元素调用操作。这个块可以被任何其他块替代,比如BufferBlock,它允许您根据需要从它轮询。