多线程MSMQ监听

本文关键字:监听 MSMQ 多线程 | 更新日期: 2023-09-27 17:51:09

我目前从我的MSMQ阅读如下(简化为简洁):

public void Start()
{
    this.queue.ReceiveCompleted += this.ReceiveCompleted;
    this.queue.BeginReceive();
}
void ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
    this.queue.EndReceive(e.AsyncResult);
    try
    {
        var m = e.Message;
        m.Formatter = this.formatter;
        this.Handle(m.Body);
    }
    finally
    {
        this.queue.BeginReceive();
    }
}

然而,这只允许我处理串行消息。如何修改此代码以允许并行消息处理?

我知道我可以将this.queue.BeginReceive();移出finally并进入ReceiveCompleted的顶部,但如何阻止生成尽可能多的线程,因为我有消息?如何合理地控制并行度级别,以免线程池被淹没?是否有一些内置机制,或者我必须编写自己的管理器?

编辑:我的目标是更快地处理消息。消息的处理涉及到对第三方的异步调用,所以目前我的实现在通过队列上浪费了大量时间。

谢谢

多线程MSMQ监听

我认为托管更多队列读取器实例会更简单。然后,您可以根据需要通过部署/取消部署更多实例来快速扩展或缩小规模。

它也变成了一个管理问题,而不是一个开发问题,而这正是扩展应该是什么。

你可以使用"生产者消费者模式"…

。. NET 4和更高版本的Concurrent集合是线程安全的,并且实现了"大部分无锁"(因此在多线程中表现良好)…

你可以使用BlockingCollection与TPL相结合来实现你想要的,而不用担心线程池耗尽或类似的…你只需要将this.Handle(m.Body);行更改为MyBlockingCollection.Add(m.Body);之类的东西,并启动在MyBlockingCollection上工作的"消费者线程"并执行实际工作(例如,在MyBlockingCollection的下一个项目上调用this.Handle,例如通过调用TryTake获得)…