如何在c#中实现排队消息的等待系统?
本文关键字:消息 等待 系统 排队 实现 | 更新日期: 2023-09-27 18:16:57
我有一个服务,它使用一个线程等待传入消息,然后将它们放入队列。传入消息的频率各不相同。它们可以频繁出现,也可以大批量发送,每批之间的等待时间更长。
另一个线程检查队列并处理有消息的消息。如果没有消息,那么它应该等待,直到有消息被放入队列。现在处理消息的代码如下所示:
public void Run() {
while(thereAreNoItemsInQueue) {
Thread.Sleep(1);
}
//Start processing messages from queue
}
我听说在这样的循环中使用Sleep是不好的做法,那么在这种情况下实现等待的最佳做法是什么?我已经研究了其他的方法来做等待,从使用AutoResetEvent/ManualResetEvent或使用监视器,但我不确定哪一个是最好的。
- 我还应该补充一点,这些消息被保存到的队列是一个事务性MSMQ
c#中没有。从来没有。看到:
它们可以经常来,有时也可以大量发送每批之间的等待时间更长。
这可能也意味着处理批处理的时间,这意味着"使用持久性"。
Windows有一个队列机制(MSMQ),你可以使用数据库。但是,如果您必须处理工作项,则不应该在没有持久性的情况下始终保持工作项在过程中。如果服务器蓝屏怎么办?
将它们转储到持久队列中,然后从该队列开始工作。
如果你必须让他们排队,…为什么要阻塞线程?那是浪费。
当消息到来时,创建一个TASK来处理队列(如果不存在)。这样,没有消息的队列就不会阻塞线程(这是昂贵的)。
对于一些非常琐碎的事情,考虑使用自定义调度程序(以确保它们处理的时间最多)来设计任务。X同时)-瞧。它不像在进程中排队,在。net中已经没有多种机制了。
您可以使用BlockingCollection
。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
class Program
{
static void Main(string[] args)
{
new Program().run();
}
void run()
{
int threadCount = 4;
Task[] workers = new Task[threadCount];
Task.Factory.StartNew(consumer);
for (int i = 0; i < threadCount; ++i)
{
int workerId = i;
Task task = new Task(() => worker(workerId));
workers[i] = task;
task.Start();
}
for (int i = 0; i < 100; ++i)
{
Console.WriteLine("Queueing work item {0}", i);
inputQueue.Add(i);
Thread.Sleep(50);
}
Console.WriteLine("Stopping adding.");
inputQueue.CompleteAdding();
Task.WaitAll(workers);
outputQueue.CompleteAdding();
Console.WriteLine("Done.");
Console.ReadLine();
}
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
foreach (var workItem in inputQueue.GetConsumingEnumerable())
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem);
Thread.Sleep(100); // Simulate work.
outputQueue.Add(workItem); // Output completed item.
}
Console.WriteLine("Worker {0} is stopping.", workerId);
}
void consumer()
{
Console.WriteLine("Consumer is starting.");
foreach (var workItem in outputQueue.GetConsumingEnumerable())
{
Console.WriteLine("Consumer is using item {0}", workItem);
Thread.Sleep(25);
}
Console.WriteLine("Consumer is finished.");
}
BlockingCollection<int> inputQueue = new BlockingCollection<int>();
BlockingCollection<int> outputQueue = new BlockingCollection<int>();
}
}
但是如果你正在使用.Net 4.5
,你可以从任务并行库(TPL)中查看数据流(它可能有一些更好的,高层次的答案-以陡峭的学习曲线为代价,但我认为值得投入时间)。