.NET AMQP消息传递模式问题

本文关键字:问题 模式 消息传递 AMQP NET | 更新日期: 2023-09-27 18:24:09

我使用RabbitMQ创建了一个小类,它在主题交换上实现了发布/订阅消息传递模式。在这个pub/sub之上,我有方法和属性:

  1. void Send(Message,Subject)-将消息发布到目标主题,供任何订阅者处理。

  2. MessageReceivedEvent-订阅此消息传递实例上的消息接收事件(消息传递实例在创建时绑定到所需的订阅主题)。

  3. SendWaitReply(Message,Subject)-发送一条消息并阻止,直到收到与已发送消息id匹配的相关id的回复消息(或超时)。这本质上是pub/sub模式之上的请求/应答或RPC机制。

由于系统的设计方式,我选择的消息传递模式在某种程度上是一成不变的。我意识到我可以使用回复队列来缓解SendWaitPReply的潜在问题,但这打破了一些要求。

现在我的问题是:

  • 对于Listen事件,当监听器在单个线程中运行时,消息将通过事件订阅者同步处理。在处理大量消息时(即在后端进程中使用来自web api的事件),这会导致一些严重的性能问题。我正在考虑传入一个回调函数,而不是订阅一个事件,然后使用Task或Threadpool并行调度回调集合。线程安全现在显然是调用者关心的问题。我不确定这种做法是否正确。

  • 对于SendWaitPReply事件,我构建了一个看起来像是黑客的解决方案,它从消息侦听器循环中获取所有入站消息,如果它们包含非空的相关guid,则将它们放在ConcurrentDictionary中。然后在SendWaitPReply方法中,我轮询ConcurrentDictionary,以查找包含与已发送消息的Id匹配的密钥的消息(或在某段时间后超时)。如果有一种更快/更好的方法可以做到这一点,我真的很想调查一下。也许有一种方法可以向所有当前被阻止的SendWaitReply方法发出信号,表明有新消息可用,他们都应该检查自己的ID,而不是连续轮询?


2014年10月15日更新

经过大量详尽的研究,我得出的结论是,在RabbitMQ或AMQP的范围内,没有"官方"机制/助手/库来直接处理我上面为SendWaitAppy提出的特定用例。我将暂时坚持我当前的解决方案(并研究更健壮的实现)。有人建议我使用提供的RPC功能,但不幸的是,这只适用于您希望在每个请求的基础上使用独占回调队列的情况。这打破了我的一个主要要求,即所有消息(请求和回复)在同一主题交换中都可见。

为了进一步澄清,SendWaitReply请求的典型消息对的格式为:

  • Topic_Exchange.Service_A=>some_command=>Topic_EExchange.Service_B
  • Topic_Exchange.Service_B=>some_command_respoy=>Topic_EExchange.Service_A

这为我提供了一种强大的调试和日志记录技术,我只需在Topic_Exchange上设置一个侦听器#,并且可以通过各种服务查看用于跟踪非常深入的"调用堆栈"的所有系统流量。

TL;DR-下面的当前问题

从体系结构级别后退—消息侦听器循环仍然存在问题。我已经尝试了EventingBasicConsumer,但仍然看到一个块。我的类的工作方式是调用方订阅该类实例提供的委托。消息循环在该委托上触发事件,然后这些订阅者处理消息。似乎我需要一种不同的方式来将消息事件处理程序传递到实例中,这样它们就不会全部位于一个强制同步处理的委托之后。

.NET AMQP消息传递模式问题

很难说为什么您的代码在没有示例的情况下被阻塞,但为了防止在消费时被阻塞,您应该使用EventingBasicConsumer。

var consumer = new EventingBasicConsumer;
consumer.Received += (s, delivery) => { /* do stuff here */ };
channel.BasicConsume(queue, false, consumer);

需要注意的是,如果您使用autoAck=false(就像我所做的那样),那么您需要确保在执行通道时锁定通道。BasicAck,否则您可能会遇到.NET库中的并发问题。

对于SendWaitReply,如果您只使用RabbitMQ客户端库中包含的SimpleRpcClient,您可能会有更好的运气:

var props = channel.CreateBasicProperties();
// Set your properties
var client = new RabbitMQ.Client.MessagePatterns.SimpleRpcClient(channel, exchange, ExchangeType.Direct, routingKey);
IBasicProperties replyProps;
byte[] response = client.Call(props, body, out replyProps);

SimpleRpcClient将处理创建临时队列、关联ID等问题,而不是构建自己的队列。如果你发现你想做一些更高级的事情,这个来源也是一个很好的参考。