多线程MSMQ读取

本文关键字:读取 MSMQ 多线程 | 更新日期: 2023-09-27 18:11:39

我做了一个Windows服务,从MSMQueue中读取消息,我需要并行地这样做(两个线程应该同时读取消息)。我该怎么做呢?下面是我的代码(基本上是按照课本写的):

public partial class MyNewService : ServiceBase
    {
        System.Messaging.MessageQueue mq;
        System.Messaging.Message mes;
        public MyNewService()
        {
            InitializeComponent();
            if (MessageQueue.Exists("MyServer''MyQueue"))
                mq = new System.Messaging.MessageQueue("MyServer''MyQueue");
            mq.ReceiveCompleted += new ReceiveCompletedEventHandler(MyReceiveCompleted);
            mq.BeginReceive();
        }
        private static void MyReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult)
        {
           try
           {
                MessageQueue mq = (MessageQueue)source;
                Message m = mq.EndReceive(asyncResult.AsyncResult);
                // TODO: Process the m message here
                // Restart the asynchronous receive operation.
                mq.BeginReceive();
            }
            catch(MessageQueueException)
            {
             // Handle sources of MessageQueueException.
            }
            return; 
         }
}

下面是Main函数:

static class Program
  {
    static void Main()
    {
      ServiceBase[] ServicesToRun;
      ServicesToRun = new ServiceBase[] 
            { 
                new MyNewService() 
            };
      ServiceBase.Run(ServicesToRun);
    }
  }

多线程MSMQ读取

是否存在不能在多个线程上执行处理而不是在多个线程上执行脱队列的原因?

这是一个非常基本的实现-它使用ThreadPool来队列项,但是您随后依赖ThreadPool的队列来处理线程数和工作项数。这可能不适合你的情况,这取决于许多其他因素。

另外,请注意此处关于SetMaxThreads的备注部分。

private static void MyReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult)
{
   try
   {
       MessageQueue mq = (MessageQueue)source;
       Message m = mq.EndReceive(asyncResult.AsyncResult);
       // TODO: Process each message on a separate thread
       // This will immediately queue all items on the threadpool,
       // so there may be more threads spawned than you really want
       // Change how many items are allowed to process concurrently using ThreadPool.SetMaxThreads()
       System.Threading.ThreadPool.QueueUserWorkItem(new WaitCallback(doWork), m);

       // Restart the asynchronous receive operation.
       mq.BeginReceive();
   }
   catch(MessageQueueException)
   {
       // Handle sources of MessageQueueException.
   }
   return; 
}
private static void doWork(object message)
{
    // TODO: Actual implementation here.
}

我将在多个windows服务上托管队列读取器的单线程实例。

这样,您可以通过增加额外的服务来增加吞吐量,或者通过减少服务来减少吞吐量。这比在代码中完成要简单得多。