RabbitMQ 消费者恢复

本文关键字:恢复 消费者 RabbitMQ | 更新日期: 2023-09-27 18:31:11

我最近开始深入研究RabbitMQ。我使用 RabbitMQ .net 库创建了一个 Windows 服务,它充当我的使用者。此消费者将用于处理批量流程,例如发送大量电子邮件等。

我通过实现作为 RabbitMQ .net 库一部分的 SimpleRpcServer 类并重写 HandleCall/HandleCast 方法来构建它。在使用和处理消息方面,一切都很好。我们已经开始研究可用于将此 Windows 服务部署到 Amazon Web Services 服务器的部署选项。将更新部署到 Windows 服务时,必须停止、更新该服务,然后重新启动该服务。

我的问题是:我该怎么做,以便在 Windows 服务上触发 Stop 事件时,该服务要么等待所有当前传递给使用者的消息完成处理,要么重新排队任何已传递但尚未开始处理的消息。

下面是一些示例代码:

public partial class ExampleService: ServiceBase
{    
    private List<Task> _watcherTasks = new List<Task>();
    protected override void OnStart(string[] args)
    {
        var factory = new ConnectionFactory() { 
                HostName = _hostname,
                VirtualHost = _virtualHost,
                UserName = _username,
                Password = _password,
                Ssl = new SslOption
                {
                    Enabled = true,
                    ServerName = _hostname,
                    AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch |
                                            SslPolicyErrors.RemoteCertificateChainErrors
                },
                RequestedHeartbeat = 30
            };
            conn = factory.CreateConnection();
            var emailQueue = requestChannel.QueueDeclare("email", false, false, false, null);
            var emailSub = new Subscription(requestChannel, emailQueue);
            var emailServer = new ServiceBusConsumer(emailSub);
            Task emailWatcher = Task.Run(() => emailServer.MainLoop());            
            _watcherTasks.Add(emailWatcher);
     }
     protected override void OnStop()
     {
          conn.Close();
          Task.WaitAll(_watcherTasks.ToArray(), 60000);    
     }
}
public class ServiceBusConsumer : SimpleRpcServer
{
    public ServiceBusConsumer(Subscription subscription) : base(subscription)
    {
    }
    public override void HandleSimpleCast(bool isRedelivered, RabbitMQ.Client.IBasicProperties requestProperties, byte[] body)
    {
        try
        {
             //Uses some reflection and invokes function to process the message here.
        }
        catch (Exception ex)
        {
             //Creates event log entry of exception
        }
    }
}

RabbitMQ 消费者恢复

不能在使用消息后将消息显式推送回队列。终止使用消息的进程应会导致消息在超时期限后重新排队,前提是尚未发送确认,并且您的队列配置为接收确认。但请注意,这可能会中断队列中消息的排序。

例如,假设您发送消息 M1 和 M2,其中 M2 依赖于 M1。如果您在处理 M1 时终止使用者,并且 M2 仍在队列中,则 M1 将在 M2 后面重新排队。当您的使用者重新启动时,这些消息将被无序处理。

试图实现这一点会带来太多的复杂性。我只是允许 Windows 服务完成处理其取消排队的消息,然后简单地停止侦听队列,并正常终止。您的代码应如下所示:

while (!stopConsuming) {
    try {
        BasicDeliverEventArgs basicDeliverEventArgs;
        var messageIsAvailable = consumer.Queue.Dequeue(timeout, out basicDeliverEventArgs);
        if (!messageIsAvailable) continue;
            var payload = basicDeliverEventArgs.Body;
            var message = Encoding.UTF8.GetString(payload);
            OnMessageReceived(new MessageReceivedEventArgs {
                Message = message,
                EventArgs = basicDeliverEventArgs
            });
            if (implicitAck && !noAck) channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false);
        }
        catch (Exception exception) {
            OnMessageReceived(new MessageReceivedEventArgs {
                Exception = new AMQPConsumerProcessingException(exception)
            });
            if (!catchAllExceptions) Stop();
        }
    }
}

在此示例中,stopConsuming 变量(如果从另一个线程访问,则标记为volatile)将确定 Windows 服务在处理完当前消息后是否继续从队列读取消息。如果设置为 true ,循环将结束,并且不会再有消息被取消排队。

我将部署一个 C# 库,该库完全可以实现您的要求等。请查看我的博客,insidethecpu.com 以获取详细信息和教程。您可能也对此感兴趣。