响应式扩展和重试

本文关键字:重试 扩展 响应 | 更新日期: 2023-09-27 18:12:20

今天早上我看到了一系列的文章。它从这个问题开始,这导致了GitHub上的原始示例和源代码。

我稍微重写了一下,所以我可以开始在控制台和服务应用程序中使用它:

public static class Extensions
{
    static readonly TaskPoolScheduler Scheduler = new TaskPoolScheduler(new TaskFactory());
    // Licensed under the MIT license with <3 by GitHub
    /// <summary>
    /// An exponential back off strategy which starts with 1 second and then 4, 8, 16...
    /// </summary>
    [SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")]
    public static readonly Func<int, TimeSpan> ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2));
    /// <summary>
    /// A linear strategy which starts with 1 second and then 2, 3, 4...
    /// </summary>
    [SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")]
    public static readonly Func<int, TimeSpan> LinearStrategy = n => TimeSpan.FromSeconds(1*n);
    /// <summary>
    /// Returns a cold observable which retries (re-subscribes to) the source observable on error up to the 
    /// specified number of times or until it successfully terminates. Allows for customizable back off strategy.
    /// </summary>
    /// <param name="source">The source observable.</param>
    /// <param name="retryCount">The number of attempts of running the source observable before failing.</param>
    /// <param name="strategy">The strategy to use in backing off, exponential by default.</param>
    /// <param name="retryOnError">A predicate determining for which exceptions to retry. Defaults to all</param>
    /// <param name="scheduler">The scheduler.</param>
    /// <returns>
    /// A cold observable which retries (re-subscribes to) the source observable on error up to the 
    /// specified number of times or until it successfully terminates.
    /// </returns>
    [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
    public static IObservable<T> RetryWithBackoffStrategy<T>(
        this IObservable<T> source,
        int retryCount = 3,
        Func<int, TimeSpan> strategy = null,
        Func<Exception, bool> retryOnError = null,
        IScheduler scheduler = null)
    {
        strategy = strategy ?? ExponentialBackoff;
        scheduler = scheduler ?? Scheduler;
        if (retryOnError == null)
            retryOnError = e => true;
        int attempt = 0;
        return Observable.Defer(() =>
        {
            return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1), scheduler))
                .Select(item => new Tuple<bool, T, Exception>(true, item, null))
                .Catch<Tuple<bool, T, Exception>, Exception>(e => retryOnError(e)
                    ? Observable.Throw<Tuple<bool, T, Exception>>(e)
                    : Observable.Return(new Tuple<bool, T, Exception>(false, default(T), e)));
        })
        .Retry(retryCount)
        .SelectMany(t => t.Item1
            ? Observable.Return(t.Item2)
            : Observable.Throw<T>(t.Item3));
    }
}

现在测试它是如何工作的,我写了这个小程序:

class Program
{
    static void Main(string[] args)
    {
        int tryCount = 0;
        var cts = new CancellationTokenSource();
        var sched = new TaskPoolScheduler(new TaskFactory());
        var source = Observable.Defer(
            () =>
            {
                Console.WriteLine("Action {0}", tryCount);
                var a = 5/tryCount++;
                return Observable.Return("yolo");
            });
        source.RetryWithBackoffStrategy(scheduler: sched, strategy: Extensions.LinearStrategy, retryOnError: exception => exception is DivideByZeroException);
        while (!cts.IsCancellationRequested)
            source.Subscribe(
                res => { Console.WriteLine("Result: {0}", res); },
                ex =>
                {
                    Console.WriteLine("Error: {0}", ex.Message); 
                },
                () =>
                {
                    cts.Cancel();
                    Console.WriteLine("End Processing after {0} attempts", tryCount);
                });
    }
}

最初,我认为订阅事件将自动触发所有后续的退役。但事实并非如此,所以我必须实现一个取消令牌并循环,直到它发出所有reties都已用尽的信号。

另一个选项是使用AutoResetEvent:
class Program
{
    static void Main(string[] args)
    {
        int tryCount = 0;
        var auto = new AutoResetEvent(false);
        var source = Observable.Defer(
            () =>
            {
                Console.WriteLine("Action {0}", tryCount);
                var a = 5/tryCount++;
                return Observable.Return("yolo");
            });
        source.RetryWithBackoffStrategy(strategy: Extensions.LinearStrategy, retryOnError: exception => exception is DivideByZeroException);
        while (!auto.WaitOne(1))
        {
            source.Subscribe(
                res => { Console.WriteLine("Result: {0}", res); },
                ex =>
                {
                    Console.WriteLine("Error: {0}", ex.Message);
                },
                () =>
                {
                    Console.WriteLine("End Processing after {0} attempts", tryCount);
                    auto.Set();
                });
        }
    }
}

在这两个场景中,它将显示以下行:

Action 0
Error: Attempted to divide by zero.
Action 1
Result: yolo
End Processing after 2 attempts

我对这群人的问题是:这是使用这个扩展的最佳方式吗?或者是否有一种订阅可观察对象的方法,这样它就会根据重试次数重新启动自己?

最终更新

根据Brandon的建议,以下是正确的订阅方式:

internal class Program
{
    #region Methods
    private static void Main(string[] args)
    {
        int tryCount = 0;
        IObservable<string> source = Observable.Defer(
            () =>
            {
                Console.WriteLine("Action {0}", tryCount);
                int a = 5 / tryCount++;
                return Observable.Return("yolo");
            });
        source.RetryWithBackoffStrategy(strategy: Extensions.ExponentialBackoff, retryOnError: exception => exception is DivideByZeroException, scheduler: Scheduler.Immediate)
            .Subscribe(
                res => { Console.WriteLine("Result: {0}", res); },
                ex => { Console.WriteLine("Error: {0}", ex.Message); },
                () =>
                {
                    Console.WriteLine("End Processing after {0} attempts", tryCount);
                });
    }
    #endregion
}

输出将略有不同:

Action 0
Action 1
Result: yolo
End Processing after 2 attempts

这是一个非常有用的扩展。下面是另一个如何使用它的例子,其中策略和错误处理是通过委托给出的。

internal class Program
{
    #region Methods
    private static void Main(string[] args)
    {
        int tryCount = 0;
        IObservable<string> source = Observable.Defer(
            () =>
            {
                Console.WriteLine("Action {0}", tryCount);
                int a = 5 / tryCount++;
                return Observable.Return("yolo");
            });
        source.RetryWithBackoffStrategy(
            strategy: i => TimeSpan.FromMilliseconds(1),
            retryOnError: exception =>
            {
                if (exception is DivideByZeroException)
                {
                    Console.WriteLine("Tried to divide by zero");
                    return true;
                }
                return false;
            },
            scheduler: Scheduler.Immediate).Subscribe(
                res => { Console.WriteLine("Result: {0}", res); },
                ex => { Console.WriteLine("Error: {0}", ex.Message); },
                () =>
                {
                    Console.WriteLine("Succeeded after {0} attempts", tryCount);
                });
    }
    #endregion
}
输出:

Action 0
Tried to divide by zero
Action 1
Result: yolo
Succeeded after 2 attempts

响应式扩展和重试

是的,Rx通常是异步的,所以在编写测试时,您需要等待它完成(否则Main只是在调用Subscribe后退出)。

另外,确保你订阅了调用source.RetryWithBackoffStrategy(...)产生的可观察对象。这会产生一个新的可观察对象,它具有重试语义。

在这种情况下,最简单的解决方案是直接使用Wait:
try
{
  var source2 = source.RetryWithBackoffStrategy(/*...*/);
  // blocks the current thread until the source finishes
  var result = source2.Wait(); 
  Console.WriteLine("result=" + result);
}
catch (Exception err)
{
  Console.WriteLine("uh oh", err);
}

如果你使用像NUnit(它支持异步测试)这样的东西来编写测试,那么你可以这样做:

[Test]
public async Task MyTest()
{
    var source = // ...;
    var source2 = source.RetryWithBackoffStrategy(/*...*/);
    var result = await source2; // you can await observables
    Assert.That(result, Is.EqualTo(5));
}