等待 RabbitMQ 线程在 Windows Service OnStop() 中完成

本文关键字:OnStop Service RabbitMQ 线程 Windows 等待 | 更新日期: 2023-09-27 18:34:05

我正在开发一个用C#(.NET 4.5,VS2012(编写的Windows服务,它使用RabbitMQ(通过订阅接收消息(。有一个类派生自DefaultBasicConsumer,在这个类中有两个实际的消费者(所以两个通道(。由于有两个通道,因此两个线程处理传入消息(来自两个不同的队列/路由密钥(,并且都调用相同的HandleBasicDeliver(...)函数。

现在,当调用 Windows 服务OnStop()时(当有人停止服务时(,我想让这两个线程完成处理其消息(如果它们当前正在处理消息(,将确认发送到服务器,然后停止服务(中止线程等(。

我想过多种解决方案,但似乎没有一个是非常好的。这是我尝试过的:

  • 使用一个互斥锁;每个线程在进入 HandleBasicDelivery 时尝试获取它,然后在之后释放它。当调用OnStop()时,主线程会尝试获取相同的互斥锁,从而有效地阻止 RabbitMQ 线程实际处理更多消息。缺点是,一次只有一个使用者线程可以处理消息。
  • 使用两个互斥锁
  • :每个 RabbitMQ 线程都使用不同的互斥锁,因此它们不会在 HandleBasicDeliver(( 中相互阻塞 - 我可以区分哪个线程实际上是根据路由密钥处理当前消息。像这样:

    HandleBasicDeliver(...)
    {
        if(routingKey == firstConsumerRoutingKey)
        {
            // Try to grab the mutex of the first consumer
        }
        else
        {
            // Try to grab the mutex of the second consumer
        }
    }
    

    当调用OnStop()时,主线程将尝试获取两个互斥锁;一旦两个互斥锁都"掌握在"主线程手中,它就可以继续停止服务。问题是:如果将另一个消费者添加到此类中,我需要更改大量代码。

  • 使用计数器或CountdownEvent .计数器从 0 开始,每次输入 HandleBasicDeliver() 时,计数器都会使用 Interlock 类安全地递增。处理消息后,计数器递减。调用 OnStop() 时,主线程会检查计数器是否为 0。如果满足此条件,它将继续存在。但是,在检查计数器是否为 0 后,某些 RabbitMQ 线程可能会开始处理消息。
  • 调用 OnStop() 时,关闭与 RabbitMQ 的连接(以确保没有新消息到达(,然后等待几秒钟(如果有任何消息正在处理,则完成处理(,然后再关闭应用程序。问题是,在关闭应用程序之前我应该等待的确切秒数是未知的,所以这不是一个优雅或精确的解决方案。

我意识到设计不符合单一责任原则,这可能会导致缺乏解决方案。但是,是否可以在不重新设计项目的情况下很好地解决此问题?

等待 RabbitMQ 线程在 Windows Service OnStop() 中完成

我们在应用程序中这样做, 主要思想是使用 CancelTokenSource

在您的 Windows 服务上添加以下内容:

private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();

然后在你的兔子消费者中这样做:1. 从使用Dequeue改为DequeueNoWait2. 让您的兔子消费者检查取消令牌

这是我们的代码:

        public async Task StartConsuming(IMessageBusConsumer consumer, MessageBusConsumerName fullConsumerName, CancellationToken cancellationToken)
        {
            var queueName = GetQueueName(consumer.MessageBusConsumerEnum);
            using (var model = _rabbitConnection.CreateModel())
            {
                // Configure the Quality of service for the model. Below is how what each setting means.
                // BasicQos(0="Don't send me a new message until I’ve finished",  _fetchSize = "Send me N messages at a time", false ="Apply to this Model only")
                model.BasicQos(0, consumer.FetchCount.Value, false);
                var queueingConsumer = new QueueingBasicConsumer(model);
                model.BasicConsume(queueName, false, fullConsumerName, queueingConsumer);

                var queueEmpty = new BasicDeliverEventArgs(); //This is what gets returned if nothing in the queue is found.
                while (!cancellationToken.IsCancellationRequested)
                {
                    var deliverEventArgs = queueingConsumer.Queue.DequeueNoWait(queueEmpty);
                    if (deliverEventArgs == queueEmpty)
                    {
                        // This 100ms wait allows the processor to go do other work.
                        // No sense in going back to an empty queue immediately. 
                        // CancellationToken intentionally not used!
                        // ReSharper disable once MethodSupportsCancellation
                        await Task.Delay(100);  
                        continue;
                    }
                    //DO YOUR WORK HERE!
                  }
}

通常,我们如何确保Windows服务在处理完成之前不会停止是使用如下所示的代码。希望有帮助。

    protected override void OnStart(string[] args)
    {
        // start the worker thread
        _workerThread = new Thread(WorkMethod)
        {
            // !!!set to foreground to block windows service be stopped
            // until thread is exited when all pending tasks complete
            IsBackground = false
        };
        _workerThread.Start();
    }
    protected override void OnStop()
    {
        // notify the worker thread to stop accepting new migration requests
        // and exit when all tasks are completed
        // some code to notify worker thread to stop accepting new tasks internally
        // wait for worker thread to stop
        _workerThread.Join();
    }