RabbitMQ重新处理死信队列
本文关键字:队列 处理 新处理 RabbitMQ | 更新日期: 2023-09-27 18:13:15
我正在寻找一种方法,从兔子死信队列中获取项目,并将它们重新处理回它们来自的队列。有没有一种内置的方法来做到这一点?
您可以像使用普通队列一样使用来自DLX(实际上是来自DLQ)的消息。
您所建议的(从DLQ获取消息并将其发布到它最初是死信的队列)可能导致(通常也是这样)消息循环。
最佳实践方法是以某种单独的方式处理死信消息,或者根本不处理它们。
我没有找到一个内置的方法来做到这一点,所以我创建了我自己的小解决方案。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing.v0_9_1;
namespace RabbitMQReprocessDeadLetter
{
public class RabbitReprocessor
{
private readonly IModel _model;
private readonly string _deadLetterQueueName;
private const ushort FetchSize = 10;
private const string ConsumerName = "DeadLetterReprocessor";
public RabbitReprocessor(IConnection rabbitConnection, string deadLetterQueueName)
{
_deadLetterQueueName = deadLetterQueueName;
_model = rabbitConnection.CreateModel();
}
public void StartConsuming(CancellationTokenSource cancellationTokenSource = null)
{
// Configure the Quality of service for the model. Below is how what each setting means.
// BasicQos(0="Dont send me a new message untill I’ve finshed", _fetchSize = "Send me N messages at a time", false ="Apply to this Model only")
_model.BasicQos(0, FetchSize, false);
var queueingBasicConsumer = new QueueingBasicConsumer(_model);
_model.BasicConsume(_deadLetterQueueName, false, ConsumerName, queueingBasicConsumer);
while (true)
{
if (cancellationTokenSource != null && cancellationTokenSource.IsCancellationRequested)
{
return;
}
var e = queueingBasicConsumer.Queue.Dequeue(); // blocking call
var deathProperties = (List<object>) e.BasicProperties.Headers["x-death"];
var prop = (Dictionary<string, object>)deathProperties.Single();
var queueAsByteArray = (byte[])prop["queue"];
var queueName = queueAsByteArray.ConvertToString();
var data = e.Body;
try
{
Console.WriteLine("{0} => {1}", queueName, data.Deserialize<long>());
}
// ReSharper disable once EmptyGeneralCatchClause
catch { }
SendMessageToQueue(queueName, data);
_model.BasicAck(e.DeliveryTag, false);
}
}
/// <summary>
/// delivery the message directly into the queue from which it came.
///
/// You may want to put it back into an exchange instead of a queue.
/// </summary>
private void SendMessageToQueue(string queueName, byte[] messageBytes)
{
const string exchangeName = "";
if (string.IsNullOrEmpty(queueName))
{
throw new ArgumentNullException("queueName");
}
if (messageBytes == null)
{
throw new ArgumentNullException("messageBytes");
}
var basicProperties = new BasicProperties
{
DeliveryMode = 2//2 = durable
};
_model.BasicPublish(exchangeName, queueName, basicProperties, messageBytes);
}
}
}
完整的解决方案可以在这里找到:
https://github.com/jayhilden/RabbitMQReprocessDeadLetter