等待 RabbitMQ 线程在 Windows Service OnStop() 中完成
本文关键字:OnStop Service RabbitMQ 线程 Windows 等待 | 更新日期: 2023-09-27 18:34:05
我正在开发一个用C#(.NET 4.5,VS2012(编写的Windows服务,它使用RabbitMQ(通过订阅接收消息(。有一个类派生自DefaultBasicConsumer
,在这个类中有两个实际的消费者(所以两个通道(。由于有两个通道,因此两个线程处理传入消息(来自两个不同的队列/路由密钥(,并且都调用相同的HandleBasicDeliver(...)
函数。
现在,当调用 Windows 服务OnStop()
时(当有人停止服务时(,我想让这两个线程完成处理其消息(如果它们当前正在处理消息(,将确认发送到服务器,然后停止服务(中止线程等(。
我想过多种解决方案,但似乎没有一个是非常好的。这是我尝试过的:
- 使用一个互斥锁;每个线程在进入 HandleBasicDelivery 时尝试获取它,然后在之后释放它。当调用
OnStop()
时,主线程会尝试获取相同的互斥锁,从而有效地阻止 RabbitMQ 线程实际处理更多消息。缺点是,一次只有一个使用者线程可以处理消息。
使用两个互斥锁 :每个 RabbitMQ 线程都使用不同的互斥锁,因此它们不会在 HandleBasicDeliver(( 中相互阻塞 - 我可以区分哪个线程实际上是根据路由密钥处理当前消息。像这样:
HandleBasicDeliver(...) { if(routingKey == firstConsumerRoutingKey) { // Try to grab the mutex of the first consumer } else { // Try to grab the mutex of the second consumer } }
当调用
OnStop()
时,主线程将尝试获取两个互斥锁;一旦两个互斥锁都"掌握在"主线程手中,它就可以继续停止服务。问题是:如果将另一个消费者添加到此类中,我需要更改大量代码。- 使用计数器或
CountdownEvent
.计数器从 0 开始,每次输入HandleBasicDeliver()
时,计数器都会使用 Interlock 类安全地递增。处理消息后,计数器递减。调用OnStop()
时,主线程会检查计数器是否为 0。如果满足此条件,它将继续存在。但是,在检查计数器是否为 0 后,某些 RabbitMQ 线程可能会开始处理消息。 - 调用
OnStop()
时,关闭与 RabbitMQ 的连接(以确保没有新消息到达(,然后等待几秒钟(如果有任何消息正在处理,则完成处理(,然后再关闭应用程序。问题是,在关闭应用程序之前我应该等待的确切秒数是未知的,所以这不是一个优雅或精确的解决方案。
我意识到设计不符合单一责任原则,这可能会导致缺乏解决方案。但是,是否可以在不重新设计项目的情况下很好地解决此问题?
我们在应用程序中这样做, 主要思想是使用 CancelTokenSource
在您的 Windows 服务上添加以下内容:
private static readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource();
然后在你的兔子消费者中这样做:1. 从使用Dequeue
改为DequeueNoWait
2. 让您的兔子消费者检查取消令牌
这是我们的代码:
public async Task StartConsuming(IMessageBusConsumer consumer, MessageBusConsumerName fullConsumerName, CancellationToken cancellationToken)
{
var queueName = GetQueueName(consumer.MessageBusConsumerEnum);
using (var model = _rabbitConnection.CreateModel())
{
// Configure the Quality of service for the model. Below is how what each setting means.
// BasicQos(0="Don't send me a new message until I’ve finished", _fetchSize = "Send me N messages at a time", false ="Apply to this Model only")
model.BasicQos(0, consumer.FetchCount.Value, false);
var queueingConsumer = new QueueingBasicConsumer(model);
model.BasicConsume(queueName, false, fullConsumerName, queueingConsumer);
var queueEmpty = new BasicDeliverEventArgs(); //This is what gets returned if nothing in the queue is found.
while (!cancellationToken.IsCancellationRequested)
{
var deliverEventArgs = queueingConsumer.Queue.DequeueNoWait(queueEmpty);
if (deliverEventArgs == queueEmpty)
{
// This 100ms wait allows the processor to go do other work.
// No sense in going back to an empty queue immediately.
// CancellationToken intentionally not used!
// ReSharper disable once MethodSupportsCancellation
await Task.Delay(100);
continue;
}
//DO YOUR WORK HERE!
}
}
通常,我们如何确保Windows服务在处理完成之前不会停止是使用如下所示的代码。希望有帮助。
protected override void OnStart(string[] args)
{
// start the worker thread
_workerThread = new Thread(WorkMethod)
{
// !!!set to foreground to block windows service be stopped
// until thread is exited when all pending tasks complete
IsBackground = false
};
_workerThread.Start();
}
protected override void OnStop()
{
// notify the worker thread to stop accepting new migration requests
// and exit when all tasks are completed
// some code to notify worker thread to stop accepting new tasks internally
// wait for worker thread to stop
_workerThread.Join();
}