什么是在RabbitMQ的C#客户端中优雅地停止消息消耗的优雅方式
本文关键字:消息 方式 客户端 什么 RabbitMQ | 更新日期: 2023-09-27 18:24:55
我正在设置一个标准的独立线程,在C#中侦听RabbitMQ。假设在线程中侦听的方法如下:
public void Listen()
{
using (var channel = connection.CreateModel())
{
var consumer = SetupQueues(channel);
while (true)
{
var ea = consumer.Queue.Dequeue(); // blocking call
handler.HandleMessage(channel, ea);
}
}
}
什么是在RabbitMQ的C#客户端中优雅地停止消息消耗的优雅方法?请记住,我在RabbitMQ示例/文档或这些SO问题中没有发现任何用处:
- 如何停止使用选择性队列中的消息-RabbitMQ
- 如何在rabbitmqpika-python中优雅地暂停和恢复消费
- 在运行RabbitMQ使用者的情况下安全地结束java应用程序的最佳方法是什么
这里的问题是consumer.Queue.Dequeue()
是一个阻塞调用。我尝试过这些选项:
调用
channel.BasicCancel(string tag)
。这导致阻塞调用中出现System.IO.EndOfStreamException
。出于明显的原因,我不想将此异常作为控制流的一部分。调用
consumer.Queue.Dequeue(int millisecondsTimeout, out T result)
并在循环迭代之间检查标志。这是可行的,但看起来很粗糙。
我想让线程优雅地退出,并清理我可能拥有的任何非托管资源,这样就不会有线程中止等。
感谢您的帮助。感谢
带超时的DeQueue;标志是实现这一点的方法。这是一种非常常见的模式,也是为什么许多阻塞调用都提供了启用超时的版本。
或者,抛出(已知的)异常对控制流来说并不一定是坏事。优雅地关闭可能意味着实际捕获异常,注释"这是在请求关闭通道时抛出的",然后干净地返回。这就是TPL的一部分如何使用CancellationToken。
阻塞方法不是属性事件驱动的。我不知道他们为什么建议使用consumer.Queue.Dequeue();
总之,我通常不使用consumer.Queue.Dequeue();
我以这种方式扩展默认消费者:
class MyConsumer : DefaultBasicConsumer {
public MyConsumer(IModel model):base(model)
{
}
public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body) {
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
}
}
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { Uri = "amqp://aa:bbb@lemur.cloudamqp.com/xxx" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("hello", false, false, false, null);
var consumer = new MyConsumer(channel);
String tag = channel.BasicConsume("hello", true, consumer);
Console.WriteLine(" [*] Waiting for messages." +
" any key to exit");
Console.ReadLine();
channel.BasicCancel(tag);
/*while (true)
{
/////// DON'T USE THIS
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
}*/
}
}
}
}
通过这种方式,您就没有阻塞方法,并且可以正确地释放所有资源。
编辑
我认为使用ctrl+C来破坏程序总是错误的。