对一个消费者使用EasyNetQ多个Handler是行不通的

本文关键字:多个 EasyNetQ Handler 行不通 消费者 一个 | 更新日期: 2023-09-27 18:02:36

我们在c# . net (EasyNetQ客户端)中使用RabbitMQ来排队消息。

我想要一个消费者应用程序(c#控制台应用程序)监听一个队列,并为每个消息类型提供多个处理程序。

我实现了这个场景,我的代码在这里:

using (var advancedBus = RabbitHutch.CreateBus("host=localhost;prefetchcount=100")
                                    .Advanced)
{
    var queue = advancedBus.QueueDeclare("MyQueue");
    advancedBus.Consume(queue, x => x
               .Add<MessageType1>((message, info) =>
               {
                  Console.WriteLine("MessageType1 Body : " + message.Body.Body);
               })
               .Add<MessageType2>((message, info) => 
               {
                  Console.WriteLine(" MessageType2 Body: " + message.Body.Body);
               }).ThrowOnNoMatchingHandler = false);
}

My Problem:但是当我执行这个消费者时,它什么也不做。

我发布消息到队列,像这样:

using (var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced)
{
    var queue = advancedBus.QueueDeclare("MyQueue");
    if (advancedBus.IsConnected)
        advancedBus.Publish(Exchange.GetDefault(), queue.Name, false, false,
            new Message<MessageType1>(change));
    else
        result = false;
}

有什么问题?

对一个消费者使用EasyNetQ多个Handler是行不通的

好了,测试完这段代码后,问题如下:

首先,您在注册消费后立即处理您的advancedBus。您需要记住,当您调用IAdvanceBus.Consume时,您只为每个消息注册一个回调。如果在注册后立即处置总线,则无法调用委托,因为连接已关闭。因此,您将删除兔子声明周围的using语句(不要忘记在完成后处理它):

var advancedBus = RabbitHutch.CreateBus("host=localhost;prefetchcount=100").Advanced

第二,immediate标志已被弃用,不应该使用,消息似乎没有进入队列。修改Publish为:

advancedBus.Publish(Exchange.GetDefault(), queue.Name, true, false,
                    new Message<MessageType1>(change));

同样,如果你从控制台应用程序运行这个,不要忘记使用Console.ReadKey,这样你的主线程就不会终止。

下面是一个工作代码示例:

static void Main()
{
    var change = new MessageType1();
    var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced;
    ConsumeMessage(advancedBus);
    var queue = advancedBus.QueueDeclare("MyQueue");
    if (advancedBus.IsConnected)
    {
        advancedBus.Publish(Exchange.GetDefault(), queue.Name, true, false,
            new Message<MessageType1>(change));
    }
    else
    {
        Console.WriteLine("Can't connect");
    }
    Console.ReadKey();
}
private static void ConsumeMessage(IAdvancedBus advancedBus)
{
    var queue = advancedBus.QueueDeclare("MyQueue");
    advancedBus.Consume(queue, registration =>
    {
        registration.Add<MessageType1>((message, info) =>
        {
            Console.WriteLine("Body: {0}", message.Body);
        });
    });
}