使用 BlockingCollection 作为单一生产者、单一消费者 FIFO 查询是否好
本文关键字:单一 消费者 FIFO 查询 是否 生产者 使用 BlockingCollection | 更新日期: 2023-09-27 18:30:55
我需要单生产者、单消费者 FIFO 查询,因为
- 我需要按照消息接收的顺序处理消息。
- 我需要异步执行此操作,因为调用方在我处理消息时不应等待。
- 仅当上一个消息处理完成后,才应启动下一个消息处理。有时,"接收"消息的频率高于"处理"消息的频率。但平均而言,我应该能够处理所有消息,只是有时我必须"排队"它们。
所以我认为这很像TCP/IP,你有一个生产者和一个消费者,有时你可以比处理更快的速度接收消息,所以你必须查询它们。在哪里顺序很重要,在哪里调用者绝对不感兴趣你用这些东西做什么。
这听起来很简单,我可能可以使用一般Queue
,但我想为此使用 BlockingCollection
,因为我不想用ManualResetEvent
等编写任何代码。
BlockingCollection
如何适合我的任务,也许您可以提出其他建议?
BlockingCollection
类实现了IProducerConsumerCollection接口,因此完全符合您的要求。
您可以创建两个任务,一个用于异步生产者,另一个作为使用者工作线程。前者会将项目添加到BlockingCollection
,而后者只是在FIFO订单中有新的可用时立即消耗。
使用 TPL 任务和阻塞集合的生产者-消费者示例应用程序:
class ProducerConsumer
{
private static BlockingCollection<string> queue = new BlockingCollection<string>();
static void Main(string[] args)
{
Start();
}
public static void Start()
{
var producerWorker = Task.Factory.StartNew(() => RunProducer());
var consumerWorker = Task.Factory.StartNew(() => RunConsumer());
Task.WaitAll(producerWorker, consumerWorker);
}
private static void RunProducer()
{
int itemsCount = 100;
while (itemsCount-- > 0)
{
queue.Add(itemsCount + " - " + Guid.NewGuid().ToString());
Thread.Sleep(250);
}
}
private static void RunConsumer()
{
foreach (var item in queue.GetConsumingEnumerable())
{
Console.WriteLine(DateTime.Now.ToString("HH:mm:ss.ffff") + " | " + item);
}
}
}
IProducerConsumerCollection:
定义用于操作用于 生产者/消费者使用情况。此接口提供统一的 代表生产者/消费者收藏,以便更高层次 抽象,例如 System.Collections.Concurrent.BlockingCollection(Of T) 可以使用 集合作为底层存储机制。
既然你需要一个队列,为什么不坚持排队呢?您可以使用同步队列。