RabbitMQ Pub/Sub -订阅者无法接收消息
本文关键字:消息 Pub Sub RabbitMQ | 更新日期: 2023-09-27 17:52:58
我使用示例代码实现使用"fanout"交换类型的发布/订阅。但是如下代码所示,订阅者没有显示已发布的"Hello Word"消息。
Publisher.cs
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return ((args.Length > 0)
? string.Join(" ", args)
: "info: Hello World!");
}
Subscriber.cs
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
Console.WriteLine(" [*] Waiting for logs.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] {0}", message);
};
channel.BasicConsume(queue: queueName,
noAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
代码Ref: https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html
正如您所发现的那样,它实际上可以工作,您只需要首先启动订阅者。为什么?答案也在您提供的链接中。我在这里引用一部分:
但是我们的记录器不是这样的。我们想知道所有的日志消息,而不仅仅是其中的一个子集。我们也只对当前正在运行的消息不在旧消息中。要解决这个问题我们需要两件事。
首先,无论何时连接到Rabbit ,我们都需要一个新的空队列。要做到这一点,我们可以创建一个随机名称的队列,或者更好-让服务器为我们选择一个随机队列名称。其次,一旦断开消费者的连接,队列就应该断开自动删除。
这基本上意味着只有在启动订阅者时才创建队列,并且只有在这时,交换器才有一个队列来实际地将消息放入其中。因为您首先启动发布者,所以没有队列让消息在