EasyNetQ -通过非通用消息从IAdvanceBus消费消息

本文关键字:消息 IAdvanceBus -通 EasyNetQ | 更新日期: 2023-09-27 18:10:25

是的,我们可以使用这个订阅id的自动订阅功能和正常订阅方法,但是这个解决方案在RMQ队列和交换中有点难看。很难跟踪和分析这些信息。

我使用高级总线并创建了自己的交换和队列。我成功地发表了文章,但消费的部分有点失望。目前,它是这样使用的:

    IAdvanceBus = bus.Advanced.Consume(queueName, registration => 
{
    registration.Add<MESSAGE1>((message, info) => { ProcessMessage(MESSAGE1) }) 
    registration.Add<MESSAGE2>((message, info) => { ProcessMessage(MESSAGE2) }) 
    registration.Add<MESSAGE3>((message, info) => { ProcessMessage(MESSAGE3) }) 
    registration.Add<MESSAGE4>((message, info) => { ProcessMessage(MESSAGE4) }) 
    registration.Add<MESSAGE5>((message, info) => { ProcessMessage(MESSAGE5) });
});

这很好,但是如果你有100个听众呢?

我检查了注册类型IHandlerRegistration它只使用通用的一个,我们可以有非通用的方式吗?

:

    IAdvanceBus = bus.Advanced.Consume(queueName, registration => 
{
    registration.Add(typeof(MESSAGE1), info => { ProcessMessage(MESSAGE1) })    
    registration.Add(typeof(MESSAGE2), info => { ProcessMessage(MESSAGE2) })    
    registration.Add(typeof(MESSAGE3), info => { ProcessMessage(MESSAGE3) })    
});

通过这种方式,我们可以扫描使用该消息的程序集。

在另一边,我通过构造总线进行注册:

RabbitHutch.CreateBus(connectionString, registeredServices => {
IEasyNetQLogger logger;
MyCustomHandlerCollection myHandlers = myDIContainer.Resolve<IMyHandlers>();
registeredServices.Register<IHandlerCollection>(s => 
{ 
    logger = s.Resolve<IEasyNetQLogger>();
    return myHandlers;
});
registeredServices.Register<IHandlerRegistration>(s => myHandlers});});

但是它不尊重我的注册,因为当我看到advance总线的代码消费:消费代码是从工厂创建的,而不是从容器读取。我相信这是根本原因。

EasyNetQ -通过非通用消息从IAdvanceBus消费消息

为了解决这个需求,我使用了IAdvanceBus中的这个方法:

IDisposable Consume(IQueue queue, Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);

我滚动了自己的消息调度程序,并对队列中的任何消息进行反序列化。集合调度程序将确定消息类型并调度到使用反射创建的特定处理程序。