如何在c#中实现排队消息的等待系统?

本文关键字:消息 等待 系统 排队 实现 | 更新日期: 2023-09-27 18:16:57

我有一个服务,它使用一个线程等待传入消息,然后将它们放入队列。传入消息的频率各不相同。它们可以频繁出现,也可以大批量发送,每批之间的等待时间更长。

另一个线程检查队列并处理有消息的消息。如果没有消息,那么它应该等待,直到有消息被放入队列。现在处理消息的代码如下所示:

public void Run() {
    while(thereAreNoItemsInQueue) {
        Thread.Sleep(1);
    }
    //Start processing messages from queue
}
我听说在这样的循环中使用Sleep是不好的做法,那么在这种情况下实现等待的最佳做法是什么?我已经研究了其他的方法来做等待,从使用AutoResetEvent/ManualResetEvent或使用监视器,但我不确定哪一个是最好的。
    我还应该补充一点,这些消息被保存到的队列是一个事务性MSMQ

如何在c#中实现排队消息的等待系统?

c#中没有。从来没有。看到:

它们可以经常来,有时也可以大量发送每批之间的等待时间更长。

这可能也意味着处理批处理的时间,这意味着"使用持久性"。

Windows有一个队列机制(MSMQ),你可以使用数据库。但是,如果您必须处理工作项,则不应该在没有持久性的情况下始终保持工作项在过程中。如果服务器蓝屏怎么办?

将它们转储到持久队列中,然后从该队列开始工作。

如果你必须让他们排队,…为什么要阻塞线程?那是浪费。

当消息到来时,创建一个TASK来处理队列(如果不存在)。这样,没有消息的队列就不会阻塞线程(这是昂贵的)。

对于一些非常琐碎的事情,考虑使用自定义调度程序(以确保它们处理的时间最多)来设计任务。X同时)-瞧。它不像在进程中排队,在。net中已经没有多种机制了。

您可以使用BlockingCollection

下面是一些示例代码(一个控制台应用程序)。在这个示例中,我使用了多个工作线程和一个消费者线程,但是您的代码可能只使用其中一个:
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
    class Program
    {
        static void Main(string[] args)
        {
            new Program().run();
        }
        void run()
        {
            int threadCount = 4;
            Task[] workers = new Task[threadCount];
            Task.Factory.StartNew(consumer);
            for (int i = 0; i < threadCount; ++i)
            {
                int workerId = i;
                Task task = new Task(() => worker(workerId));
                workers[i] = task;
                task.Start();
            }
            for (int i = 0; i < 100; ++i)
            {
                Console.WriteLine("Queueing work item {0}", i);
                inputQueue.Add(i);
                Thread.Sleep(50);
            }
            Console.WriteLine("Stopping adding.");
            inputQueue.CompleteAdding();
            Task.WaitAll(workers);
            outputQueue.CompleteAdding();
            Console.WriteLine("Done.");
            Console.ReadLine();
        }
        void worker(int workerId)
        {
            Console.WriteLine("Worker {0} is starting.", workerId);
            foreach (var workItem in inputQueue.GetConsumingEnumerable())
            {
                Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem);
                Thread.Sleep(100);          // Simulate work.
                outputQueue.Add(workItem);  // Output completed item.
            }
            Console.WriteLine("Worker {0} is stopping.", workerId);
        }
        void consumer()
        {
            Console.WriteLine("Consumer is starting.");
            foreach (var workItem in outputQueue.GetConsumingEnumerable())
            {
                Console.WriteLine("Consumer is using item {0}", workItem);
                Thread.Sleep(25);
            }
            Console.WriteLine("Consumer is finished.");
        }
        BlockingCollection<int> inputQueue = new BlockingCollection<int>();
        BlockingCollection<int> outputQueue = new BlockingCollection<int>();
    }
}

但是如果你正在使用.Net 4.5,你可以从任务并行库(TPL)中查看数据流(它可能有一些更好的,高层次的答案-以陡峭的学习曲线为代价,但我认为值得投入时间)。