同一个应用程序可以同时是NServiceBus的发布者/订阅者吗?

本文关键字:发布者 NServiceBus 应用程序 同一个 | 更新日期: 2023-09-27 18:18:59

我是消息传递体系结构的新手,所以我可能会以错误的方式进行。但我想通过解决一个小问题,慢慢地在我的团队中引入NServiceBus。

日程中的约会有状态。两个用户可以在同一个应用程序的同一个日程中查看同一个约会。他们通过中央服务器上的远程会话启动这个应用程序。因此,如果用户1更新了约会的状态,我希望用户2看到新的状态'real time'。

如果你愿意,我做了一个新的控制台应用程序来模拟这个或做一个概念证明。通过NuGet,我得到了NServiceBus和NServiceBus。因为我从文档中了解到我两者都需要。我知道在生产代码中不建议将所有内容放在同一个程序集中,但是发布者和订阅者很可能最终会在同一个程序集中…

在程序方法Main类中,我编写了以下代码:

BusConfiguration configuration = new BusConfiguration();
configuration.UsePersistence<InMemoryPersistence>();
configuration.UseSerialization<XmlSerializer>();
configuration.UseTransport<MsmqTransport>();
configuration.TimeToWaitBeforeTriggeringCriticalErrorOnTimeoutOutages(new TimeSpan(1, 0, 0));
ConventionsBuilder conventions = configuration.Conventions();
conventions.DefiningEventsAs(t => t.Namespace != null
    && t.Namespace.Contains("Events"));
using (IStartableBus bus = Bus.Create(configuration))
{
    bus.Start();
    Console.WriteLine("Press key");
    Console.ReadKey();
    bus.Publish<Events.AppointmentStateChanged>(a =>
    {
        a.AppointmentID = 1;
        a.NewState = "New state";
    });
    Console.WriteLine("Event published.");
    Console.ReadKey();
}

在类EndPointConfig方法Customize中我添加了:

configuration.UsePersistence<InMemoryPersistence>();
configuration.UseSerialization<XmlSerializer>();
configuration.UseTransport<MsmqTransport>();
ConventionsBuilder conventions = configuration.Conventions();
conventions.DefiningEventsAs(t => t.Namespace != null
    && t.Namespace.Contains("Events"));

AppointmentStateChanged是Events文件夹中的一个简单类,如下所示:

public class AppointmentStateChanged: IEvent {
    public int AppointmentID { get; set; }
    public string NewState { get; set; }
}

任命mentstatechangedhandler是事件处理器:

public class AppointmentStateChangedHandler : IHandleMessages<Events.AppointmentStateChanged> {
public void Handle(Events.AppointmentStateChanged message) {
        Console.WriteLine("AppointmentID: {0}, changed to state: {1}", 
            message.AppointmentID, 
            message.NewState);
    }
}

如果我启动一个控制台应用程序,一切都很好。我看到处理程序处理事件。但是如果我尝试启动第二个控制台应用程序,它会崩溃:System.Messaging.MessageQueueException(请求操作的超时时间已过期)。所以我一定是做错了什么,让我再次猜测,我不理解更高层次的东西。谁能告诉我正确的方向?

所有内容都在名称空间议程更新中,除了事件类,它在议程更新中。事件名称空间。

更新2 步骤:

  • 复制的议程更新解决方案(到议程更新文件夹)
  • 在副本中,我将App.Config中的messageendpointmapping中的EndPoint属性更改为"议程更新"我得到MSMQ异常:"队列不存在或您没有足够的权限来执行操作"
  • 在副本中,我将这行代码添加到EndPointConfig: configuration.EndpointName("议程更新2");我得到MSMQ异常:"队列不存在或您没有足够的权限来执行操作"
  • 在副本中,我将这行代码添加到Program类的Main方法中:configuration.EndpointName("AgendaUpdates2");

->我用原始和复制的解决方案启动了2个visual studio来测试它。然后在IDE中启动两个控制台应用程序。

同一个应用程序可以同时是NServiceBus的发布者/订阅者吗?

我不太确定为什么你会得到特定的异常,但我可以解释为什么你想做的事情失败了。问题是在同一个应用程序中没有发布者和订阅者(这是可能的,也是有用的);问题是您在同一台机器上运行同一个应用程序的两个实例。

NServiceBus依赖于队列技术(在你的例子中是MSMQ),为了使一切正常工作,每个应用程序都需要有自己独特的队列。当您启动两个相同的实例时,它们都试图共享同一个队列。

您可以修改一些东西,以使您的场景工作,并更好地理解排队是如何工作的:

  1. 更改第二个实例的EndPointName
  2. 在另一台机器上运行第二个实例
  3. 将发布者和订阅者分离到单独的进程

无论您采用哪种方式,您都需要调整您的messageendpointappings(在消费者/订阅者上)以反映主机/发布者队列所在的位置(消息类型的"所有者"):

http://docs.particular.net/nservicebus/messaging/message-owner configuring-endpoint-mapping

基于你的更新编辑

我知道这是一个测试设置/概念证明,但将这两个部署(相同代码)视为发布者/主机和订阅者/客户端仍然是有用的。因此,我们将原始文件称为主机,将副本称为客户机。我假设您不希望每个订阅另一个(至少在这个基本测试中)。

另外,确保在您的机器上以管理员身份运行两个ide。我不确定这是不是干扰。

在副本中,我将App.Config中的messageendpointmapping中的端点属性更改为"agentaupdates2",我得到了MSMQ异常:"队列不存在或您没有足够的权限来执行操作"

由于副本是客户端,您希望将其映射指向主机。所以这应该是"议程更新"(省略"2")。

在副本中,我将这行代码添加到EndPointConfig: configuration.EndpointName(" agenda aupdates2 ");我得到了MSMQ异常:"队列不存在或您没有足够的权限来执行操作"

在副本中,我将这行代码添加到Program类的Main方法中:configuration.EndpointName(" agenda aupdates2 ");按

键后又出现原来的异常

我最初没有注意到这一点,但是您不需要两次配置端点。我相信你的EndPointConfig没有被调用,因为它只在通过NSB主机可执行文件托管时使用。你可以直接删除这个类。

这听起来很合理,但请记住,如果你的副本是订阅者,就不应该发布,所以在它开始后不要按任何键(只按原始的键)。

如果您希望发布者同时也是消息的接收者,则需要在配置中指定。

这在本文中有清楚的解释,你的问题的解决方案完全在文章的末尾。