实现可重试块的正确完成

本文关键字:重试 实现 | 更新日期: 2023-09-27 18:21:22

Teaser:伙计们,这个问题不是关于如何实现重试策略。这是关于TPL数据流块的正确完成。

这个问题主要是我上一个问题ITargetBlock中重试策略的延续。这个问题的答案是@svick的智能解决方案,它利用了TransformBlock(源)和TransformManyBlock(目标)。剩下的唯一问题是以正确的方式完成此块:首先等待所有重试完成,然后完成目标块。以下是我最终得到的(这只是一个片段,不要太关注非线程安全的retries集):

var retries = new HashSet<RetryingMessage<TInput>>();
TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message =>
    {
        try
        {
            var result = new[] { await transform(message.Data) };
            retries.Remove(message);
            return result;
        }
        catch (Exception ex)
        {
            message.Exceptions.Add(ex);
            if (message.RetriesRemaining == 0)
            {
                if (failureHandler != null)
                    failureHandler(message.Exceptions);
                retries.Remove(message);
            }
            else
            {
                retries.Add(message);
                message.RetriesRemaining--;
                Task.Delay(retryDelay)
                    .ContinueWith(_ => target.Post(message));
            }
            return null;
        }
    }, dataflowBlockOptions);
source.LinkTo(target);
source.Completion.ContinueWith(async _ =>
{
    while (target.InputCount > 0 || retries.Any())
        await Task.Delay(100);
    target.Complete();
});

其想法是执行某种轮询,并验证是否仍有消息等待处理,以及是否没有需要重试的消息。但在这个解决方案中,我不喜欢投票的想法。

是的,我可以将添加/删除重试的逻辑封装到一个单独的类中,甚至可以在重试集变空时执行一些操作,但如何处理target.InputCount > 0条件?当块没有挂起的消息时,没有这样的回调被调用,所以在一个具有小延迟的循环中验证target.ItemCount似乎是唯一的选择。

有人知道实现这一目标的更聪明的方法吗?

实现可重试块的正确完成

也许手动重置事件可以帮你。

TransformManyBlock 添加公共属性

private ManualResetEvent _signal  = new ManualResetEvent(false);
public ManualResetEvent Signal { get { return _signal; } }

给你:

var retries = new HashSet<RetryingMessage<TInput>>();
TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message =>
    {
        try
        {
            var result = new[] { await transform(message.Data) };
            retries.Remove(message);
            // Sets the state of the event to signaled, allowing one or more waiting threads to proceed
            if(!retries.Any()) Signal.Set(); 
            return result;
        }
        catch (Exception ex)
        {
            message.Exceptions.Add(ex);
            if (message.RetriesRemaining == 0)
            {
                if (failureHandler != null)
                    failureHandler(message.Exceptions);
                retries.Remove(message);
                // Sets the state of the event to signaled, allowing one or more waiting threads to proceed
                if(!retries.Any()) Signal.Set(); 
            }
            else
            {
                retries.Add(message);
                message.RetriesRemaining--;
                Task.Delay(retryDelay)
                    .ContinueWith(_ => target.Post(message));
            }
            return null;
        }
    }, dataflowBlockOptions);
source.LinkTo(target);
source.Completion.ContinueWith(async _ =>
{
    //Blocks the current thread until the current WaitHandle receives a signal.
    target.Signal.WaitOne();
    target.Complete();
});

我不确定你的target.InputCount设置在哪里。因此,在您更改target.InputCount的地方,您可以添加以下代码:

if(InputCount == 0)  Signal.Set();

将hwcverwe答案和JamieSee注释结合起来可能是理想的解决方案。

首先,您需要创建多个事件:

var signal  = new ManualResetEvent(false);
var completedEvent = new ManualResetEvent(false);

然后,您必须创建一个观察者,并订阅TransformManyBlock,以便在发生相关事件时得到通知:

var observer = new RetryingBlockObserver<TOutput>(completedEvent);
var observable = target.AsObservable();
observable.Subscribe(observer);

可观察的可能很容易:

private class RetryingBlockObserver<T> : IObserver<T> {
        private ManualResetEvent completedEvent;
        public RetryingBlockObserver(ManualResetEvent completedEvent) {                
            this.completedEvent = completedEvent;
        }
        public void OnCompleted() {
            completedEvent.Set();
        }
        public void OnError(Exception error) {
            //TODO
        }
        public void OnNext(T value) {
            //TODO
        }
    }

您可以等待信号或完成(耗尽所有源项目),或同时等待

 source.Completion.ContinueWith(async _ => {
            WaitHandle.WaitAll(completedEvent, signal);
            // Or WaitHandle.WaitAny, depending on your needs!
            target.Complete();
        });

您可以检查WaitAll的结果值,以了解设置了哪个事件,并做出相应的反应。您还可以将其他事件添加到代码中,将它们传递给观察器,以便观察器在需要时设置它们。您可以区分自己的行为,并在出现错误时做出不同的响应,例如