在c#中,等待阻塞集合(队列)的大小减小
本文关键字:队列 集合 等待 | 更新日期: 2023-09-27 18:02:30
我正在做一个项目,有以下工作流程:
第一部分:
- 事件异步到达并在阻塞队列中排队,我们将其称为Q1
- 线程从队列中获取下一个可用项目
- 项目最终并行运行{N}个任务
- 每个任务将其结果在第二个队列上排队,我们将其称为Q2。
- 当一个项目处理完毕时,下一个项目从队列中读取。
两部分:
- 另一个线程每次读取一个Q2对象并处理结果
所以,这里的问题是,第一个队列上的每个项目最终并行运行大量任务,每个任务将其结果排队。第二个队列必须串行地处理,每次处理一个项目,并且它正在被淹没。
我的问题
我需要一种机制,使线程处理Q1等待,直到Q2中的项数低于特定阈值。实现这一目标的最佳方式是什么?是否有办法使用事件驱动的解决方案而不是轮询解决方案?
您可以使用BlockingCollection<T>
代替Queue<T>
。如果设置了BoundedCapacity
,则呼叫Q2.Add()
将在达到容量时阻塞。这将自动限制Q1的处理,因为如果N个任务不能添加到最终队列中,它们将开始阻塞。
我假设您在偶尔的洪水中接收数据,在长时间的干旱期间Q2可以赶上。您是否考虑过通过为这些任务使用有限的线程池来简单地限制从Q1派生的并发线程的数量?
如果作业大小在到达时很容易确定,那么我怀疑您可以从多个线程池中获益。您可以使用少量线程来处理大型作业,而使用大量线程来处理小型作业。甚至第三个中间队列也可能是有益的。你的问题似乎是一个完美的例子来解决TPL Dataflow
库。如果你愿意尝试一下,下面是它的工作原理(当然这是一个非常简单的例子):
TransformBlock<int, bool> transform = new TransformBlock<int, bool>(i => i > 5 ? true : false,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
ActionBlock<bool> resultBlock = new ActionBlock<bool>(b => Console.WriteLine("My result is : " + b),
new ExecutionDataflowBlockOptions { BoundedCapacity = 10 });
transform.LinkTo(resultBlock);
你正在定义一个转换块,它将使你的转换(这工作作为你的Q1
),你可以设置它的并行级别,你想要使用的任务的数量。
Q2
),它将设置BoundedCapacity
并同步处理每个消息,为每个元素调用操作。这个块可以被任何其他块替代,比如BufferBlock
,它允许您根据需要从它轮询。