应用程序崩溃&在使用者运行时退出调试器&重启Rabbit mq服务

本文关键字:重启 Rabbit mq 调试器 服务 运行时 崩溃 使用者 应用程序 退出 | 更新日期: 2023-09-27 18:07:50

我有一个c#应用程序,我试图测试我的rabbit mq客户端的弹性。在消费者运行时,我停止rabbit mq服务,以查看消费者将如何处理此问题。

我在几乎所有的消费者上都有trycatch,但是由于后台线程中可能出现异常,我的应用程序在输出窗口中打印以下内容并存在调试器。

线程"AMQP连接AMQP://test.com:5671"(0x6da18)已退出代码为0 (0x0)。

System.Net类型的第一次异常。webeexception '发生在System.dll

然后存在调试器。我唯一注意到的是,在代码存在之前调用了消费者类的析构函数。

using System;
using System.IO;
using System.Runtime.Serialization;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Util;
namespace RabbitMQClient
{
  public class MessageQueueConsumer : IHealthVerifiable
  {
    public class TimeoutException : Exception { }
    // Have to do this because, somehow, SharedQueue implementation of IEnumerable is faulty
    // Count() method hangs, and never returns
    private class BufferQueue : SharedQueue<BasicDeliverEventArgs>
    {
      public int Count()
      {
        return this.m_queue.Count;
      }
    }
    private const int DEFAULT_ACK_COUNT = 1000;
    private String connString;
    private QueueingBasicConsumer consumer;
    private IConnection conn;
    private IModel channel;
    private String queueName;
    private BufferQueue buffer;
    private Object locker = new Object();
    private ushort prefetchCount;
    private ushort ackCount;
    public MessageQueueConsumer(String queueName, String connString, ushort? ackCount = null)
    {
      this.queueName = queueName;
      this.connString = connString;
      if (ackCount != null)
        this.ackCount = ackCount.Value;
      else
        this.ackCount = DEFAULT_ACK_COUNT;
      this.prefetchCount = (ushort)(this.ackCount * 2);
      InitConsumer();
    }
    ~MessageQueueConsumer()
    {
      Close();
    }
    public void Close()
    {
      try
      {
        channel.Close(200, queueName + " Goodbye");
        conn.Close();
      }
      catch { } //if already closed, do nothing
    }
    private void InitConsumer()
    {
      try
      {
        ConnectionFactory factory = new ConnectionFactory();
        factory.Uri = connString;
        conn = factory.CreateConnection();
        channel = conn.CreateModel();
        channel.BasicQos(0, prefetchCount, false);
        buffer = new BufferQueue();
        consumer = new QueueingBasicConsumer(channel, buffer);
        channel.BasicConsume(queueName, false, consumer);
      }
      catch (Exception e)
      {
        InitConsumer();
      }
    }
    /// <summary>
    /// Get the next event from the queue
    /// </summary>
    /// <returns>Event</returns>
    public byte[] Dequeue(int? timeout = null)
    {
      lock (locker)
      {
        try
        {
          return AttemptDequeue(timeout);
        }
        catch (EndOfStreamException)
        {
          // Network interruption while reading the input stream
          InitConsumer();
          return AttemptDequeue(timeout);
        }
        catch (OperationInterruptedException)
        {
          // The consumer was removed, either through channel or connection closure, or through the
          // action of IModel.BasicCancel().
          // Attempt to reopen and try again
          InitConsumer();
          return AttemptDequeue(timeout);
        }
        catch (ConnectFailureException)
        {
          //Problems connecting to the queue, wait 10sec, then try again. 
          Thread.Sleep(10000);
          InitConsumer();
          return AttemptDequeue(timeout);
        }
        catch (Exception e)
        {
          //Problems connecting to the queue, wait 10sec, then try again. 
          Thread.Sleep(10000);
          InitConsumer();
          return AttemptDequeue(timeout);
        }
      }
    }
    private byte[] AttemptDequeue(int? tomeout)
    {
      BasicDeliverEventArgs message;
      if (tomeout == null)
        message = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
      else
      {
        if (!consumer.Queue.Dequeue(tomeout.Value, out message))
          throw new TimeoutException();
      }
      if (buffer.Count() == 0 || buffer.Count() == ackCount)
        channel.BasicAck(message.DeliveryTag, true);
      try
      {
        return message.Body;
      }
      catch (Exception e)
      {
        throw new SerializationException("Error deserializing queued message:", e);
      }
    }
    /// <summary>
    /// Attempt to connect to queue to see if it is available
    /// </summary>
    /// <returns>true if queue is available</returns>
    public bool IsHealthy()
    {
      try
      {
        if (channel.IsOpen)
          return true;
        else
        {
          InitConsumer();
          return true;
        }
      }
      catch
      {
        return false;
      }
    }
  }
}

任何想法我可以捕获这个异常,并尝试重试连接?

应用程序崩溃&在使用者运行时退出调试器&重启Rabbit mq服务

问题是使用QueueingBasicConsumer不实现任何恢复方法。我更改为EventingBasicConsumer,从失败中恢复工作。

namespace RabbitMQClient {   public class MessageQueueConsumer : IHealthVerifiable   {
    public class TimeoutException : Exception { }
private class BufferQueue : SharedQueue<BasicDeliverEventArgs>
{
  public int Count()
  {
    return this.m_queue.Count;
  }
}
private const int DEFAULT_ACK_COUNT = 1000;
private String connString;
private EventingBasicConsumer consumer;
private IConnection conn;
private IModel channel;
private String queueName;
private BufferQueue buffer;
private Object locker = new Object();
private ushort prefetchCount;
private ushort ackCount;
public MessageQueueConsumer(String queueName, String connString, ushort? ackCount = null)
{
  this.queueName = queueName;
  this.connString = connString;
  if (ackCount != null)
    this.ackCount = ackCount.Value;
  else
    this.ackCount = DEFAULT_ACK_COUNT;
  this.prefetchCount = (ushort)(this.ackCount * 2);
  InitConsumer();
}
~MessageQueueConsumer()
{
  Close();
}
public void Close()
{
  try
  {
    channel.Close(200, queueName + " Goodbye");
   // conn.Close();
  }
  catch { } //if already closed, do nothing
}
private void InitConsumer()
{
  ConnectionFactory factory = new ConnectionFactory();
  factory.Uri = connString;
  conn = factory.CreateConnection();
  channel = conn.CreateModel();
  channel.BasicQos(0, prefetchCount, false);
  buffer = new BufferQueue();
  consumer = new EventingBasicConsumer(channel);
  channel.BasicConsume(queueName, false, consumer);
  // when message is recieved do following
  consumer.Received += (model, message) =>
  {
      if (buffer.Count() > DEFAULT_ACK_COUNT)
        Thread.Sleep(3000);
        buffer.Enqueue(message);
        if (buffer.Count() == 0 || buffer.Count() == ackCount)
          channel.BasicAck(message.DeliveryTag, true);

  };
}
/// <summary>
/// Get the next event from the queue
/// </summary>
/// <returns>Event</returns>
public byte[] Dequeue(int? timeout = null)
{
  lock (locker)
  {
    try
    {
      return AttemptDequeue(timeout);
    }
    catch (EndOfStreamException)
    {
      // Network interruption while reading the input stream
      InitConsumer();
      return AttemptDequeue(timeout);
    }
    catch (OperationInterruptedException)
    {
      // The consumer was removed, either through channel or connection closure, or through the
      // action of IModel.BasicCancel().
      // Attempt to reopen and try again
      InitConsumer();
      return AttemptDequeue(timeout);
    }
    catch (ConnectFailureException)
    {
      //Problems connecting to the queue, wait 10sec, then try again. 
      Thread.Sleep(10000);
      InitConsumer();
      return AttemptDequeue(timeout);
    }
  }
}
private byte[] AttemptDequeue(int? tomeout)
{
  BasicDeliverEventArgs message;
  while (true)
  {
    //while buffer has no events
    if (buffer.Count() == 0)
    {
      Thread.Sleep(3000);
    }
    else
    {
      message = buffer.Dequeue();
      break;
    }
  }
  try
  {
    return message.Body;
  }
  catch (Exception e)
  {
    throw new SerializationException("Error deserializing queued message:", e);
  }
}
/// <summary>
/// Attempt to connect to queue to see if it is available
/// </summary>
/// <returns>true if queue is available</returns>
public bool IsHealthy()
{
  try
  {
    if (channel.IsOpen)
      return true;
    else
    {
      InitConsumer();
      return true;
    }
  }
  catch
  {
    return false;
  }
}   } }