MassTransit Producer消耗一个事件需要60秒

本文关键字:事件 一个 60秒 Producer MassTransit | 更新日期: 2023-09-27 18:17:07

我有两个项目:

  1. 一个名为Service.Endpoints的大规模传输(Topshelf Windows Service)
  2. 控制台应用程序客户端试图与TestConsole通信。

总体要求如下:

  1. TestConsole发送SolveProblemCommand
  2. Service.Endpoints消耗命令并发布ProblemSolvedEvent
  3. TestConsole占用事件

问题:

以上所有步骤都可以正常工作,除了步骤3 (TestConsole消费事件)只在事件发布后60秒左右发生。首先显示以下错误(60秒后),然后消费者收到呼叫。

Timeout waiting for consumer to exit: rabbitmq://localhost:5672/bus-PC-NAME-TestConsole.vshost-4sboyydjz6ne6mz6bdky1b7ad4?durable=false&autodelete=true&prefetch=16

等待消费者退出超时:rabbitmq://localhost:5672/problemsolved.queue?预取= 16

代码:

Service.Endpoints.csproj

bus = BusConfigurator.ConfigureBus(new AppSettings(), (cfg, host) =>
{
    cfg.ReceiveEndpoint(host, RabbitMqConstants.SolveProblemQueue, e =>
    {
        e.Consumer<SolveProblemCommandConsumer>(NinjectConfig.CurrentKernel);
    });
});
bus.Start();
class SolveProblemCommandConsumer : IConsumer<SolveProblemCommand>
{
    public async Task Consume(ConsumeContext<SolveProblemCommand> context)
    {
        var controller = new Controller(context.Message.Problem);
        var results = await controller.Start(context.Message.Options);
        await context.Publish(new ProblemSolvedEvent(results));
    }
}

TestConsole.csproj

var bus = BusConfigurator.ConfigureBus(new AppSettings(), (cfg, host) =>
{
    cfg.ReceiveEndpoint(host, RabbitMqConstants.ProblemSolvedQueue, e =>
    {
        e.Consumer<ProblemSolvedEventConsumer>();
    });
});
var sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}{RabbitMqConstants.SolveProblemQueue}");
var endpoint = await bus.GetSendEndpoint(sendToUri);
bus.Start();
await endpoint.Send(someMessage);

class ProblemSolvedEventConsumer : IConsumer<ProblemSolvedEvent>
{
    public async Task Consume(ConsumeContext<ProblemSolvedEvent> context)
    {
        ...
    }
}

MassTransit Producer消耗一个事件需要60秒

TestConsole.csproj项目中,我重用相同的IBusControl对象来发送命令和使用事件。一旦我创建了两个独立的总线对象,它就像预期的那样工作了。