NServiceBus 和 NHibernate EventListeners 在不同的线程上运行

本文关键字:线程 运行 NHibernate EventListeners NServiceBus | 更新日期: 2023-09-27 17:55:17

我试图实现的是我的网站发出一条消息并将其放在总线上,服务将其拾取并通过审计写入数据库,该审计会自动填充该行的添加者/更新者字段。

我通过使用 NServiceBus IMessageMutator 组件来执行此操作,该组件将用户 ID 写入来自 Thread.CurrentPrincipal 的消息标头,该消息标头来自我的 ASP.Net 应用程序中的登录用户。在我的服务中,我使用 IMessageModule 来提取此标头并将其绑定到 Thread.CurrentPrincipal。这很好用,在我的消息处理程序期间,我可以看到 Thread.CurrentPrincipal.Identity.Name 正确绑定到在 Web 应用程序中引发消息的用户 ID。

利用 NHibernate 的 IPreUpdateEventListener/IPreInsertEventListener,我在将每个实体写入数据库之前设置每个实体的 AddBy/UpdateBy。这在网站上完美运行,但在我的 NServiceBus 服务中,侦听器运行的线程与处理程序运行的线程不同,这意味着线程的 CurrentPrincipal 不再是我的 IMessageModule 中绑定的 ID。

可以看到NHibernate在调用堆栈中使用了分布式事务工厂,我怀疑这是我问题的原因。我不想失去事务性,这样如果提交失败,消息不会重试或放入错误队列,如果从队列中删除消息失败并且更新不会回滚到数据库。

我环顾了网络,所有示例都利用线程的 CurrentPrincipal 来绑定修改行的用户的 id。我正在寻找一种方法,可以将 NHibernate 侦听器与消息处理程序保持在同一线程上,或者将用户 ID 传递给侦听器,以便在将实体写入数据库之前将其绑定到实体。

这是我的听众,我省略了在其中找到的 Set 方法

public class EntityPersistenceListener : IPreUpdateEventListener, IPreInsertEventListener
{
    public bool OnPreUpdate(PreUpdateEvent @event)
    {
        var audit = @event.Entity as EntityBase;
        if (audit == null)
            return false;
        var time = DateTimeFactory.GetDateTime();
        var name = Thread.CurrentPrincipal.Identity.Name;
        Set(@event.Persister, @event.State, "AddedDate", audit.AddedDate);
        Set(@event.Persister, @event.State, "AddedBy", audit.AddedBy);
        Set(@event.Persister, @event.State, "UpdatedDate", time);
        Set(@event.Persister, @event.State, "UpdatedBy", name);
        audit.AddedDate = audit.AddedDate;
        audit.AddedBy = audit.AddedBy;
        audit.UpdatedDate= time;
        audit.UpdatedBy = name;
        return false;            
    }
}

这是 NServiceBus 消息模块,它提取 id 并将其绑定到当前线程的标识

public class TenantAndInstanceInfoExtractor : IMessageModule
{
    private readonly IBus _bus;
    public TenantAndInstanceInfoExtractor(IBus bus)
    {
        _bus = bus;
    }
    public void HandleBeginMessage()
    {
        var headers = _bus.CurrentMessageContext.Headers;
        if (headers.ContainsKey("TriggeredById"))
            Thread.CurrentPrincipal = new GenericPrincipal(new GenericIdentity(headers["TriggeredById"]), null);
        else
            Thread.CurrentPrincipal = new GenericPrincipal(new GenericIdentity(string.Empty), null);
    }
    public void HandleEndMessage()
    {
    }
    public void HandleError() { }
}

NServiceBus 和 NHibernate EventListeners 在不同的线程上运行

谢谢西蒙的帮助。在广泛地讨论了我的问题并讨论了 NServiceBus 内部的工作方式之后,我采纳了您的见解,并为 NServiceBus 实现了工作单元模块。

发生的事情是,我们依靠每条消息创建的事务将我们的 NHibernate 会话提交到数据库。这发生在利用线程池的分布式事务控制器上(具体来说,它发生在NHibernate.Transaction.AdoNetWithDistributedTransactionFactory.DistributedTransactionContext)上。

通过使用NServiceBus的IManageUnitsOfWork接口,我能够在与消息处理程序相同的线程上显式提交我们的事务,如下面的代码示例中所示。

作为任何未来读者的旁注,这里最好的解决方案是不使用 Thread.CurrentPrincipal,因为这个解决方案在多线程环境中失败了,就像它对我来说一样。

public class HiJumpNserviceBusUnitOfWork : IManageUnitsOfWork
{
    private readonly IUnitOfWork _uow;
    public HiJumpNserviceBusUnitOfWork(IUnitOfWork uow)
    {
        _uow = uow;
    }
    public void Begin()
    {
        _uow.ClearCache();
        _uow.BeginTransaction();
    }
    public void End(Exception ex = null)
    {
        if (ex != null)
        {
            _uow.ClearCache();
        }
        else
        {
            _uow.CommitTransaction();
        }
    }
}