用于处理Windows消息队列的多线程Windows服务

本文关键字:Windows 多线程 服务 消息 处理 用于 队列 | 更新日期: 2023-09-27 18:21:49

这是我第一次尝试编写Windows服务。

此windows服务必须处理2个windows消息队列。

每个消息队列都应该有自己的线程,但我似乎无法将体系结构放置到位。

我遵循这个Windows服务不断运行,这允许我创建一个线程,在其中我正在处理一个队列。

这是我的服务类别:

    protected override void OnStart(string[] args)
    {
        _thread = new Thread(WorkerThreadFunc) { Name = "Address Calculator Thread", IsBackground = true };
        _thread.Start();
    }
    private void WorkerThreadFunc()
    {
        _addressCalculator = new GACAddressCalculator();
        while (!_shutdownEvent.WaitOne(0))
        {
            _addressCalculator.StartAddressCalculation();
        }
    }

    protected override void OnStop()
    {
        _shutdownEvent.Set();
        if (!_thread.Join(5000))
        { // give the thread 5 seconds to stop
            _thread.Abort();
        }
    }

在我的GACAddressCalculator.StartAddressCalculation()中,我正在创建一个队列处理器对象,它看起来像这样:

    public void StartAddressCalculation()
    {
        try
        {
            var googleQueue = new GISGoogleQueue("VehMonLogGISGoogle", 1, _gacLogger, 1);
            googleQueue.ProccessMessageQueue();
        }
        catch (Exception ex)
        {
        }
    }

这是GISGoogleQueue:

public class GISGoogleQueue : BaseMessageQueue
{

    public GISGoogleQueue(string queueName, int threadCount, GACLogger logger, int messagesPerThread)
        : base(queueName, threadCount, logger, messagesPerThread)
    {
    }
    public override void ProccessMessageQueue()
    {
        if (!MessageQueue.Exists(base.QueueName))
        {
            _logger.LogMessage(MessageType.Information, string.Format("Queue '{0}' doesn't exist", this.QueueName));
            return;
        }
        var messageQueue = new MessageQueue(QueueName);
        var myVehMonLog = new VehMonLog();
        var o = new Object();
        var arrTypes = new Type[2];
        arrTypes[0] = myVehMonLog.GetType();
        arrTypes[1] = o.GetType();
        messageQueue.Formatter = new XmlMessageFormatter(arrTypes);
        using (var pool = new Pool(ThreadCount))
        {
            // Infinite loop to process all messages in Queue
            for (; ; )
            {
                for (var i = 0; i < MessagesPerThread; i++)
                {
                    try
                    {
                        while (pool.TaskCount() >= MessagesPerThread) ; // Stop execution until Tasks in pool have been executed

                        var message = messageQueue.Receive(new TimeSpan(0, 0, 5, 0)); // TimeOut for message reading from Queue, set to 5 minutes, Will throw exception after 5 mins
                        if (message != null) // Check if message is not Null
                        {
                            var monLog = (VehMonLog)message.Body;
                            pool.QueueTask(() => ProcessMessageFromQueue(monLog)); // Add to Tasks list in Pool
                        }
                    }
                    catch (Exception ex)
                    {
                    }
                }
            }
        }
    }

}

现在,这对1个消息队列来说很好,但如果我想处理另一个消息队列,它就不会发生,因为我在ProccessMessageQueue方法中有一个无限循环。

我想在一个单独的线程中执行每个队列。

我认为我在WorkerThreadFunc()中犯了一个错误,我必须以某种方式从那里或在OnStart()中启动两个线程。

此外,如果你有任何关于如何改进这项服务的建议,那就太好了。

顺便说一句,我正在使用这个答案中的Pool Classhttps://stackoverflow.com/a/436552/1910735对于ProccessMessageQueue 内的线程池

用于处理Windows消息队列的多线程Windows服务

我建议更改您的服务类别如下(下面的注释):

protected override void OnStart(string[] args)
{
    _thread = new Thread(WorkerThreadFunc)
              {
                  Name = "Run Constantly Thread",
                  IsBackground = true
              };
    _thread.Start();
}
GISGoogleQueue _googleQueue1;
GISGoogleQueue _googleQueue2;
private void WorkerThreadFunc()
{
    // This thread is exclusively used to keep the service running.
    // As such, there's no real need for a while loop here.  Create
    // the necessary objects, start them, wait for shutdown, and
    // cleanup.
    _googleQueue1 = new GISGoogleQueue(...);
    _googleQueue1.Start();
    _googleQueue2 = new GISGoogleQueue(...);
    _googleQueue2.Start();
    _shutdownEvent.WaitOne();  // infinite wait
    _googleQueue1.Shutdown();
    _googleQueue2.Shutdown();
}
protected override void OnStop()
{
    _shutdownEvent.Set();
    if (!_thread.Join(5000))
    {
        // give the thread 5 seconds to stop
        _thread.Abort();
    }
}

我忽略了你的GACAddressCalculator。从您展示的内容来看,它似乎是GISGoogleQueue的一个薄包装。显然,如果它真的做了一些你没有表现出来的事情,就需要把它考虑在内

请注意,在WorkerThreadFunc()中创建了两个GISGoogleQueue对象。因此,让我们接下来看看如何创建这些对象以实现适当的线程模型。

public class GISGoogleQueue : BaseMessageQueue
{
    System.Threading.Thread _thread;
    System.Threading.ManualResetEvent _shutdownEvent;
    public GISGoogleQueue(string queueName, int threadCount, GACLogger logger, int messagesPerThread)
         : base(queueName, threadCount, logger, messagesPerThread)
    {
        // Let this class wrap a thread object.  Create it here.
        _thread = new Thread(RunMessageQueueFunc()
                  {
                      Name = "Run Message Queue Thread " + Guid.NewGuid().ToString(),
                      IsBackground = true
                  };
        _shutdownEvent = new ManualResetEvent(false);
    }
    public Start()
    {
        _thread.Start();
    }
    public Shutdown()
    {
        _shutdownEvent.Set();
        if (!_thread.Join(5000))
        {
            // give the thread 5 seconds to stop
            _thread.Abort();
        }
    }
    private void RunMessageQueueFunc()
    {
        if (!MessageQueue.Exists(base.QueueName))
        {
            _logger.LogMessage(MessageType.Information, string.Format("Queue '{0}' doesn't exist", this.QueueName));
            return;
        }
        var messageQueue = new MessageQueue(QueueName);
        var myVehMonLog = new VehMonLog();
        var o = new Object();
        var arrTypes = new Type[2];
        arrTypes[0] = myVehMonLog.GetType();
        arrTypes[1] = o.GetType();
        messageQueue.Formatter = new XmlMessageFormatter(arrTypes);
        using (var pool = new Pool(ThreadCount))
        {
            // Here's where we'll wait for the shutdown event to occur.
            while (!_shutdownEvent.WaitOne(0))
            {
                for (var i = 0; i < MessagesPerThread; i++)
                {
                    try
                    {
                        // Stop execution until Tasks in pool have been executed
                        while (pool.TaskCount() >= MessagesPerThread) ;
                        // TimeOut for message reading from Queue, set to 5 minutes, Will throw exception after 5 mins
                        var message = messageQueue.Receive(new TimeSpan(0, 0, 5, 0));
                        if (message != null) // Check if message is not Null
                        {
                            var monLog = (VehMonLog)message.Body;
                            pool.QueueTask(() => ProcessMessageFromQueue(monLog)); // Add to Tasks list in Pool
                        }
                    }
                    catch (Exception ex)
                    {
                    }
                }
            }
        }
    }
}

这种方法的核心是使用由GISGoogleQueue类包装的Thread对象。对于您创建的每个GISGoogleQueue对象,您将获得一个封装的线程,该线程将在GISGoogleQueue对象上调用Start()后完成工作。

有几点。在RunMessageQueueFunc()中,您正在检查队列的名称是否存在。如果没有,函数将退出如果发生这种情况,线程也会退出。重点是,您可能希望在流程的早期进行检查。只是一个想法。

其次,请注意,您的无限循环已被针对_shutdownEvent对象的检查所取代。这样,当服务关闭时,循环将停止。为了及时性,您需要确保循环的完整传递不会花费太长时间。否则,您可能会在关闭后5秒中止线程。中止只是为了确保事情被拆除,但如果可能的话,应该避免。

我知道很多人会更喜欢使用Task类来做这样的事情。看来你在RunMessageQueueFunc()里面。但是对于在进程期间运行的线程,我认为Task类是错误的选择,因为它将线程池中的线程绑定在一起。对我来说,这就是Thread类的构建目的。

HTH

您可以像这样使用Parallel.ForEach;

 Parallel.ForEach(queueItems, ProcessQueue); //this will process each queue item in a separate thread

 private void ProcessQueue(QueueItem queue)
 {
     //your processing logic       
 }