确保事件最终发布到消息队列系统的最佳方法
本文关键字:系统 队列 最佳 方法 消息 事件 确保 | 更新日期: 2023-09-27 17:55:43
请想象你有如下方法:
public void PlaceOrder(Order order)
{
this.SaveOrderToDataBase(order);
this.bus.Publish(new OrderPlaced(Order));
}
将订单保存到数据库后,事件将发布到消息队列系统,以便同一台计算机或另一台计算机上的其他子系统可以处理该事件。
但是,如果调用失败this.bus.Publish(new OrderPlaced(Order))
会发生什么情况?还是机器在将订单保存到数据库后崩溃?事件未发布,其他子系统无法处理它。这是不可接受的。如果发生这种情况,我需要确保事件最终发布。
我可以使用哪些可接受的策略?哪个是最好的?
注意:我不想使用分布式事务。
编辑:
保罗·萨西克非常接近,我认为我可以达到100%。我是这么想的:
首先在数据库中创建一个表事件,如下所示:
CREATE TABLE Events (EventId int PRIMARY KEY)
你可能想要使用 guid 而不是 int,也可以使用序列或标识。
然后执行以下伪代码:
open transaction
save order and event via A SINGLE transaction
in case of failure, report error and return
place order in message queue
in case of failure, report error, roll back transaction and return
commit transaction
所有事件都必须包含事件 ID。当事件订阅者收到事件时,他们首先检查数据库中是否存在 EventId。
通过这种方式,您可以获得 100% 的回报,而不仅仅是 99.999%
此视频和此博客文章中介绍了确保事件最终发布到消息队列系统的正确方法
基本上你需要将要发送到数据库的消息存储在执行 bussines 逻辑操作的同一事务中,然后将消息异步发送到总线,并在另一个事务中从数据库中删除消息:
public void PlaceOrder(Order order)
{
BeginTransaction();
Try
{
SaveOrderToDataBase(order);
ev = new OrderPlaced(Order);
SaveEventToDataBase(ev);
CommitTransaction();
}
Catch
{
RollbackTransaction();
return;
}
PublishEventAsync(ev);
}
async Task PublishEventAsync(BussinesEvent ev)
{
BegintTransaction();
try
{
await DeleteEventAsync(ev);
await bus.PublishAsync(ev);
CommitTransaction();
}
catch
{
RollbackTransaction();
}
}
由于 PublishEventAsync 可能会失败,因此您必须稍后重试,因此您需要一个后台进程来重试失败的发送,如下所示:
foreach (ev in eventsThatNeedsToBeSent) {
await PublishEventAsync(ev);
}
您可以使this.bus.Publish
调用成为this.SaveOrderToDataBase
数据库事务的一部分。这意味着this.SaveOrderToDataBase
在事务范围内执行,如果数据库调用失败,则永远不会调用 mq,如果 mq 调用失败,则回滚数据库事务,使两个系统处于一致状态。如果两个调用都成功,则提交数据库事务。
伪代码:
open transaction
save order via transaction
in case of failure, report error and return
place order in message queue
in case of failure, report error, roll back transaction and return
commit transaction
您没有提到任何特定的数据库技术,因此这里是有关事务的wiki文章的链接。即使您不熟悉交易,这也是一个不错的起点。还有一点好消息:它们并不难实施。