什么是在RabbitMQ的C#客户端中优雅地停止消息消耗的优雅方式

本文关键字:消息 方式 客户端 什么 RabbitMQ | 更新日期: 2023-09-27 18:24:55

我正在设置一个标准的独立线程,在C#中侦听RabbitMQ。假设在线程中侦听的方法如下:

public void Listen()
{
    using (var channel = connection.CreateModel())
    {
        var consumer = SetupQueues(channel);
        while (true)
        {
            var ea = consumer.Queue.Dequeue();    // blocking call
            handler.HandleMessage(channel, ea);
        }
    }
}

什么是在RabbitMQ的C#客户端中优雅地停止消息消耗的优雅方法?请记住,我在RabbitMQ示例/文档或这些SO问题中没有发现任何用处:

  • 如何停止使用选择性队列中的消息-RabbitMQ
  • 如何在rabbitmqpika-python中优雅地暂停和恢复消费
  • 在运行RabbitMQ使用者的情况下安全地结束java应用程序的最佳方法是什么

这里的问题是consumer.Queue.Dequeue()是一个阻塞调用。我尝试过这些选项:

  • 调用channel.BasicCancel(string tag)。这导致阻塞调用中出现System.IO.EndOfStreamException。出于明显的原因,我不想将此异常作为控制流的一部分。

  • 调用consumer.Queue.Dequeue(int millisecondsTimeout, out T result)并在循环迭代之间检查标志。这是可行的,但看起来很粗糙。

我想让线程优雅地退出,并清理我可能拥有的任何非托管资源,这样就不会有线程中止等。

感谢您的帮助。感谢

什么是在RabbitMQ的C#客户端中优雅地停止消息消耗的优雅方式

带超时的DeQueue;标志是实现这一点的方法。这是一种非常常见的模式,也是为什么许多阻塞调用都提供了启用超时的版本。

或者,抛出(已知的)异常对控制流来说并不一定是坏事。优雅地关闭可能意味着实际捕获异常,注释"这是在请求关闭通道时抛出的",然后干净地返回。这就是TPL的一部分如何使用CancellationToken。

阻塞方法不是属性事件驱动的。我不知道他们为什么建议使用consumer.Queue.Dequeue();

总之,我通常不使用consumer.Queue.Dequeue();

我以这种方式扩展默认消费者:

class MyConsumer : DefaultBasicConsumer {
  public MyConsumer(IModel model):base(model)
  {
  }
  public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body) {
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);
  }
}

class Program
{
  static void Main(string[] args)
  {
    var factory = new ConnectionFactory() { Uri = "amqp://aa:bbb@lemur.cloudamqp.com/xxx" };
    using (var connection = factory.CreateConnection())
    {
      using (var channel = connection.CreateModel())
      {
      channel.QueueDeclare("hello", false, false, false, null);
      var consumer = new MyConsumer(channel);
      String tag = channel.BasicConsume("hello", true, consumer);
      Console.WriteLine(" [*] Waiting for messages." +
                               " any key to exit");
      Console.ReadLine();
      channel.BasicCancel(tag);

        /*while (true)
        {
          /////// DON'T USE THIS   
          var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
          var body = ea.Body;
          var message = Encoding.UTF8.GetString(body);
          Console.WriteLine(" [x] Received {0}", message);
        }*/
      }
    }

  }
}

通过这种方式,您就没有阻塞方法,并且可以正确地释放所有资源。

编辑

我认为使用ctrl+C来破坏程序总是错误的。