使用 BlockingCollection 作为单一生产者、单一消费者 FIFO 查询是否好

本文关键字:单一 消费者 FIFO 查询 是否 生产者 使用 BlockingCollection | 更新日期: 2023-09-27 18:30:55

我需要单生产者、单消费者 FIFO 查询,因为

  • 我需要按照消息接收的顺序处理消息。
  • 我需要异步执行此操作,因为调用方在我处理消息时不应等待。
  • 仅当上一个消息处理完成后,才应启动下一个消息处理。有时,"接收"消息的频率高于"处理"消息的频率。但平均而言,我应该能够处理所有消息,只是有时我必须"排队"它们。

所以我认为这很像TCP/IP,你有一个生产者和一个消费者,有时你可以比处理更快的速度接收消息,所以你必须查询它们。在哪里顺序很重要,在哪里调用者绝对不感兴趣你用这些东西做什么。

这听起来很简单,我可能可以使用一般Queue,但我想为此使用 BlockingCollection,因为我不想用ManualResetEvent等编写任何代码。

BlockingCollection如何适合我的任务,也许您可以提出其他建议?

使用 BlockingCollection<T> 作为单一生产者、单一消费者 FIFO 查询是否好

BlockingCollection类实现了IProducerConsumerCollection接口,因此完全符合您的要求。

您可以创建两个任务,一个用于异步生产者,另一个作为使用者工作线程。前者会将项目添加到BlockingCollection,而后者只是在FIFO订单中有新的可用时立即消耗。

使用 TPL 任务和阻塞集合的生产者-消费者示例应用程序:

class ProducerConsumer
{
    private static BlockingCollection<string> queue = new BlockingCollection<string>();
    static void Main(string[] args)
    {
        Start();
    }
    public static void Start()
    {
        var producerWorker = Task.Factory.StartNew(() => RunProducer());
        var consumerWorker = Task.Factory.StartNew(() => RunConsumer());
        Task.WaitAll(producerWorker, consumerWorker);
    }
    private static void RunProducer()
    {
        int itemsCount = 100;
        while (itemsCount-- > 0)
        {
            queue.Add(itemsCount + " - " + Guid.NewGuid().ToString());
            Thread.Sleep(250);
        }
    }
    private static void RunConsumer()
    {
        foreach (var item in queue.GetConsumingEnumerable())
        {
           Console.WriteLine(DateTime.Now.ToString("HH:mm:ss.ffff") + " | " + item);
        }
    }
}

IProducerConsumerCollection:

定义用于操作用于 生产者/消费者使用情况。此接口提供统一的 代表生产者/消费者收藏,以便更高层次 抽象,例如 System.Collections.Concurrent.BlockingCollection(Of T) 可以使用 集合作为底层存储机制。

既然你需要一个队列,为什么不坚持排队呢?您可以使用同步队列。