多线程MSMQ监听
本文关键字:监听 MSMQ 多线程 | 更新日期: 2023-09-27 17:51:09
我目前从我的MSMQ阅读如下(简化为简洁):
public void Start()
{
this.queue.ReceiveCompleted += this.ReceiveCompleted;
this.queue.BeginReceive();
}
void ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
this.queue.EndReceive(e.AsyncResult);
try
{
var m = e.Message;
m.Formatter = this.formatter;
this.Handle(m.Body);
}
finally
{
this.queue.BeginReceive();
}
}
然而,这只允许我处理串行消息。如何修改此代码以允许并行消息处理?
我知道我可以将this.queue.BeginReceive();
移出finally
并进入ReceiveCompleted
的顶部,但如何阻止生成尽可能多的线程,因为我有消息?如何合理地控制并行度级别,以免线程池被淹没?是否有一些内置机制,或者我必须编写自己的管理器?
编辑:我的目标是更快地处理消息。消息的处理涉及到对第三方的异步调用,所以目前我的实现在通过队列上浪费了大量时间。
谢谢
我认为托管更多队列读取器实例会更简单。然后,您可以根据需要通过部署/取消部署更多实例来快速扩展或缩小规模。
它也变成了一个管理问题,而不是一个开发问题,而这正是扩展应该是什么。
你可以使用"生产者消费者模式"…
。. NET 4和更高版本的Concurrent
集合是线程安全的,并且实现了"大部分无锁"(因此在多线程中表现良好)…
你可以使用BlockingCollection
与TPL相结合来实现你想要的,而不用担心线程池耗尽或类似的…你只需要将this.Handle(m.Body);
行更改为MyBlockingCollection.Add(m.Body);
之类的东西,并启动在MyBlockingCollection
上工作的"消费者线程"并执行实际工作(例如,在MyBlockingCollection
的下一个项目上调用this.Handle
,例如通过调用TryTake
获得)…