在公共交通中实施消息跟踪 3.

本文关键字:消息 跟踪 施消息 | 更新日期: 2023-09-27 18:30:17

我已经将MassTransit与Rabbit队列一起使用了一段时间,并且已经实现了一个类似于这个问题中的IMessageTracker:如何在公共交通中记录失败的消息? 跟踪消息重试,并在重试发生时为我们提供一些监视,以及消息在重试序列中的位置 - 允许我们获取有关第三方端点何时导致我们重试的一些统计信息。这意味着我必须针对每个消息 ID 存储一个重试序列值,并在每次重试时检查它/递增它,然后在调用 MessageWasReceivedSuccess 时删除消息 ID。 此外,如果消息在所有重试后都失败,我们可以监视故障发生的时间(诚然,我们不需要消息跟踪器,但是当我为重试创建跟踪器时,它变得明智)。

我想升级到 MassTransit 3,

因为在这种情况下,新的重试策略是有益的,但我找不到使用 MassTransit 3 实现我的消息跟踪器的方法。

我目前的方法是使用消费观察器配置我的处理程序,将我的总线配置为接收观察器,如下所示:

sbc.ReceiveEndpoint(host, config.QueueName, ep =>
{
    ep.Consumer(() => _container.Resolve<IEventConsumer>());
    ep.Observer(new EventConsumeObserver());
});        
_busControl.ConnectReceiveObserver(new ReceiveObserver(container.Resolve<IMonitoringPublisher>()));

观察者如下:

public class ReceiveObserver : IReceiveObserver
{
    public ReceiveObserver()
    {
    }
    public async Task PreReceive(ReceiveContext context)
    {
        await Console.Out.WriteLineAsync("ReceiveObserver - PreReceive Observed");
    }
    public async Task PostReceive(ReceiveContext context)
    {
        await Console.Out.WriteLineAsync("ReceiveObserver - PostReceive Observed");
    }
    public async Task PostConsume<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType) where T : class
    {
        await Console.Out.WriteLineAsync("ReceiveObserver - PostConsume Observed");
    }
    public async Task ConsumeFault<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType, Exception exception) where T : class
    {
        await Console.Out.WriteLineAsync($"ReceiveObserver - ConsumeFault Observed - {exception.Message}.");
    }
    public async Task ReceiveFault(ReceiveContext context, Exception exception)
    {
        await Console.Out.WriteLineAsync("ReceiveObserver - ReceiveFault Observed");
    }
}
public class EventConsumeObserver : IObserver<ConsumeContext<IEvent>>
{
    public void OnNext(ConsumeContext<IEvent> value)
    {
        Console.WriteLine("EventConsumeObserver - OnNext");
    }
    public void OnError(Exception error)
    {
        Console.WriteLine("EventConsumeObserver - OnError");
    }
    public void OnCompleted()
    {
        Console.WriteLine("EventConsumeObserver - OnCompleted");
    }
}

这确实允许我监视何时发生重试(稍加修饰),但我在消息跟踪器上找不到与 MessageWasReceivedSuccess 方法等效的方法。此外,当我运行一个测试来模拟超时引发的异常时,输出似乎没有接近已完成的方法 - 跟踪如下:

(模拟超时)

ReceiveObserver - PreReceive Observed
Request timeout
EventConsumeObserver - OnNext
ReceiveObserver - PostConsume Observed
Request timeout
EventConsumeObserver - OnNext
ReceiveObserver - PostConsume Observed
Request timeout
EventConsumeObserver - OnNext
ReceiveObserver - PostConsume Observed
ReceiveObserver - ConsumeFault Observed - Call timed out

(未模拟超时)

ReceiveObserver - PreReceive Observed
Request success
EventConsumeObserver - OnNext
我希望每次重试时都会有

更明显的调用来消耗错误,或者在重试时调用 OnError,并在每次事件后调用 OnComplete,但事实似乎并非如此。

我是否缺少某种类型的观察者,或者是否有其他方法可以挂接到重试,以便我可以监视它们何时发生以及在成功之前执行了多少次重试?

在公共交通中实施消息跟踪 3.

您可能需要查看 IConsumeObserver ,它与实际消息消耗更符合,而不是消息接收。虽然两者都携带有趣的信息,但应为使用者抛出的每个异常调用消费观察器。还会为使用的每种消息类型调用使用前/后使用方法。