使用Ninject与EasyNetQ/RabbitMQ消息处理程序
本文关键字:RabbitMQ 消息处理 程序 EasyNetQ Ninject 使用 | 更新日期: 2023-09-27 18:29:11
我正试图将EasyNetQ与Ninject一起使用来记录消息。
我已经成功地将Ninject设置为EasyNetQ DI(我认为),但当消息到达没有无参数构造函数的处理程序时(例如,我需要在其中绑定一个存储库),它无法解决。或者至少我相信这就是问题所在,因为我在控制台上遇到了一个非常普遍的错误。
我告诉EasyNetQ像这样使用Ninject:
RabbitHutch.SetContainerFactory(() => new NinjectAdapter(container));
我想这就是我需要设置的全部内容。Ninject适配器是来自EasyNetQ的适配器。
我的处理程序如下所示:
public class ProfileDeactivatedUpdateHandler : IConsume<ProfileDeactivatedUpdate>
{
private readonly IProfileRepository _profileRepository;
public ProfileDeactivatedUpdateHandler(IProfileRepository profileRepository)
{
_profileRepository = profileRepository;
}
public void Consume(ProfileDeactivatedUpdate message)
{
//Do Stuff.
}
}
如果我添加了一个无参数构造函数,而是通过ServiceLocator(Ugh)设置Ninject,那么它就可以工作了。处理程序被称为fine,我可以通过ServiceLocator找到我的存储库,所以我知道至少Ninject知道这个存储库。
当它试图处理消息时弹出的错误是。
System.AggregateException: One or more errors occurred. ---> System.Exception: E
xception of type 'System.Exception' was thrown.
at EasyNetQ.ReflectionHelpers.DefaultFactories`1.Get()
at EasyNetQ.ReflectionHelpers.CreateInstance[T]()
at EasyNetQ.AutoSubscribe.DefaultAutoSubscriberMessageDispatcher.Dispatch[TMe
ssage,TConsumer](TMessage message)
at EasyNetQ.RabbitBus.<>c__DisplayClass6`1.<Subscribe>b__5(T msg)
--- End of inner exception stack trace ---
---> (Inner Exception #0) System.Exception: Exception of type 'System.Exception'
was thrown.
at EasyNetQ.ReflectionHelpers.DefaultFactories`1.Get()
at EasyNetQ.ReflectionHelpers.CreateInstance[T]()
at EasyNetQ.AutoSubscribe.DefaultAutoSubscriberMessageDispatcher.Dispatch[TMe
ssage,TConsumer](TMessage message)
at EasyNetQ.RabbitBus.<>c__DisplayClass6`1.<Subscribe>b__5(T msg)<---
所以我已经解决了这个问题。显然,当EasyNetQ实例化一个处理程序时,它并没有使用指定的DI框架(Boo!)。您必须单独指定一个"MessageDispatcher"实现。令人惊叹的哦但是只有Autofac的实现,而不是Ninject(Boo!x2)。
因此,我的pull请求在这里,用于实现Ninject的事件调度器的代码在这里:https://github.com/mikehadlow/EasyNetQ/pull/309
现在,您可以将该类复制并粘贴到您的项目中。然后在您的NinjectModule中或在任何设置绑定的地方,您可以执行以下操作:
//Bind Message Dispatcher to Ninject event message dispatcher
NinjectMessageDispatcher messageDispatcher = new NinjectMessageDispatcher(Kernel);
Bind<IAutoSubscriberMessageDispatcher>().ToConstant(messageDispatcher);
然后,无论你在哪里设置你的订阅,你都可以做如下操作(注意,这是使用EasyNetQ.的IConsume接口自动订阅的
var subscriber = new AutoSubscriber(_serviceBus, "ProfileServices");
subscriber.AutoSubscriberMessageDispatcher = _dispatcher;
subscriber.Subscribe(Assembly.GetExecutingAssembly());
重要的部分是将MessageDispatcher手动设置为ninject调度器的实例。你想如何实现这一点取决于你自己。
我认为在未来,EasyNetQ可能需要自动做到这一点。显然,如果您将工厂设置为使用Ninject,那么您的处理程序很可能也想使用Ninject。
哦!您可以修改上面的代码以使用您选择的DI。这需要为使用EasyNetQ的任何DI(我认为)完成,而不仅仅是Ninject。