异步处理消息时的 BasicAck
本文关键字:BasicAck 消息 处理 异步 | 更新日期: 2024-11-07 23:06:44
我正在尝试设置一个 RabbitMQ 消息传递队列,以便我可以发送消息以启动长时间运行的进程,并且还可以在需要时发送消息以取消该长时间运行的进程。所以我从一个EventingBasicConsumer
开始,在我的Recieved
处理程序中做了这样的事情:
if (startProcess)
{
// start a long running process
}
else if (cancelProcess)
{
// cancel the currently running process
}
channel.BasicAck(ea.DeliveryTag, false);
这是行不通的,因为EventingBasicConsumer
不是多线程的,一次只能处理一条消息。因此,在完成长时间运行的过程之前,它无法处理取消消息(此时,显然没有意义)。所以接下来我尝试了这个:
if (startProcess)
{
Task.Run(() => {
// start a long running process
}
}
else if (cancelProcess)
{
// cancel the currently running process
}
channel.BasicAck(ea.DeliveryTag, false);
这行得通。我现在可以取消长时间运行的进程...但是,我承认立即运行长时间运行的进程的请求,而不是在它完成后。这意味着,如果长时间运行的进程崩溃,则该消息已被删除。因此,这将要求原始发送方保持跟踪,并让接收方必须发送回消息以表示它已完成,并且一切都变得有点复杂。
所以我想也许我可以将EventingBasicConsumer
更改为始终在新线程上触发其Received
事件。所以我创建了这样的东西:
public class AsyncRabbitConsumer : DefaultBasicConsumer
{
// code all the same as EventingBasicConsumer except this bit:
public override void HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties properties,
byte[] body)
{
base.HandleBasicDeliver(consumerTag,
deliveryTag,
redelivered,
exchange,
routingKey,
properties,
body);
if (Received != null)
{
var args = new BasicDeliverEventArgs(consumerTag,
deliveryTag,
redelivered,
exchange,
routingKey,
properties,
body);
Task.Run(() =>
{
Received(this, args);
});
}
}
}
现在,在我的第一个代码片段中,我可以让它在长时间运行的进程仍在运行时处理取消消息,并且长时间运行的进程不会确认并删除它的消息,直到它真正完成(或取消)。所以这应该很棒...除非我取消时,我得到这个:
RabbitMQ.Client.Exceptions.AlreadyClosedException"类型为"RabbitMQ.Client"的异常.dll但未在用户代码中处理
其他信息:已关闭:AMQP 操作已中断:AMQP 关闭原因,由对等方发起,代码 = 406,文本="PRECONDITION_FAILED - 未知传递标记 3",类 Id = 60,方法ID = 80,原因=
从似乎是启动长时间运行进程的线程的channel.BasicAck
步骤。这到底是怎么回事呢?我认为确认(首先是取消消息,然后是长时间运行的进程消息)在这里被交叉了。有什么像样的方法可以解决这个问题吗?还是我吠错了树?
可能值得注意的是,取消长时间运行的进程不是即时的。它将在下一个方便的时间点取消,因此几乎可以肯定,取消消息将在长时间运行的进程结束之前完成处理。
你可以做的是有类似消费者对的东西 - 第一个是长时间运行的进程,第二个是代理来终止长时间运行的进程。第一个将接收消息,处理完成后对其进行处理和ACK,如果检测到终止信号,也会进行ACK。该对中的代理显然会收到取消消息并杀死第一个,并且还会生成第一个代理的另一个实例。显然,这需要进程(消费者)在RMQ之外进行通信。
想到的另一件事(但我从未尝试过这样的事情)是您在消费者中将预取计数设置为 2,并且在"处理单个数据消息"时,将第二条消息发布到代理(转发),除非它是 CANCEL 消息,在这种情况下,您确认它们 - CANCEL 和 DATA(这样称呼它)消息, 中止处理后。
另一种选择可能是在"长时间运行的进程"中,您有两个使用者线程,每个使用者线程都使用自己的通道。
我遇到了同样的错误,因为在 BasicConsumption 方法中自动确认标志是正确的。现在我已将标志更改为 false,并且在长时间运行进程的 BasicAck 方法中没有收到错误。
channel.BasicConsume(queue: "test", autoAck: false, consumer: consumer);