如何向特定订阅者重新发送消息

本文关键字:新发送 消息 | 更新日期: 2023-09-27 17:58:22

背景:

我们有一个基于MassTransit的消息传递子系统。它工作得很好,只需一些小问题就可以发送数千条信息。到目前为止,失败的消息被自动推送到xxx_error队列。这也很有效。

我们甚至有自己的基于web的管理控制台,可以查看消息队列并重新发送失败的消息。该工具基于BusDriver示例,直接在MSMQ上工作-它将消息从xxx_error复制到xxx并重新发送。

当前情况:

我们考虑迁移到RabbitMQ,它看起来更快、更可扩展。但当然,MSMQ管理控制台变得毫无用处,我不想再编写另一个版本的控制台来处理RabbitMQ队列。我宁愿使用一个通用路由,将失败的消息放入我自己的存储库中,该存储库将独立于MSMQ传输。

听起来很简单。这就是我们有一个具体问题的地方。

失败消息的存储库将包含消息正文和消息订阅者名称(以及其他属性)。稍后,我可以转到存储库,反序列化消息并将其重新发送给特定的订阅者。

但是,我们不想bus.Publish( msg ),因为消息会再次击中所有订阅者,而不是之前失败的订阅者。我们想要的是将消息重新发送给一个订阅者。

这似乎有可能:

senderbus.GetEndpoint( new Uri( "rabbitmq://servername/subscriber1" ) ).Send( msg );

senderbus.GetEndpoint( new Uri( "msmq://localhost/subscriber1" ) ).Send( msg );

(取决于使用的运输工具)。通过这种方法,消息被传递给唯一的特定订户。

问题是:

这是推荐的方法吗?我们有什么替代方案?

可能的问题是,这种方法可能会忽略有关当前订阅的信息,并将消息直接传递到订阅队列。但是,订阅者可能不再订阅该类型的消息。所以,代码应该是这样的:

if ( subscriber1 still subscribes to messages of msg.GetType() ) <- how do to this?
   senderbus.GetEndpoint( new Uri( "rabbitmq://servername/subscriber1" ) ).Send( msg );

提前感谢您对此发表任何评论。

如何向特定订阅者重新发送消息

我不会。

不确定您的管理控制台在做什么,但RabbitMQ的控制台可以做任何事情,但可以将消息从一个队列移动到另一个队列。我自己也在关注这个功能,希望有一个脚本可以专门用于此。

需要记住的一件棘手的事情是端点!=消费者端点为该总线上的所有消费者提供服务,虽然端点地址存储和重新发布到其中很简单,但单个消费者不会。

人们所说的是幂等消费者:http://en.wikipedia.org/wiki/Idempotence#Computer_science_meaning

我不会担心消费者没有订阅特定的消息,使用静态类型来建立您正在考虑升级的路由信息。将这些消息作为升级过程的一部分来处理。