具有中断条件的异步

本文关键字:异步 条件 中断 | 更新日期: 2023-09-27 18:35:59

我有一个接口:

public interface IRunner
{
    Task<bool> Run(object o);
}

我有一些使用异步实现它的类,例如:

public class Runner1 : IRunner
{
    public async Task<bool> Run(object o)
    {
        var num  = await SomeExternalAsyncFunc(o);
        return num < 12;
    }
}

我需要实现一个在所有 Runners 类上并行运行的函数,并且只有在所有类都返回 true 时才返回 true。阅读本文和本文后,我想出了以下实现:

public class RunMannager
{
    public async Task<bool> Run(ConcurrentBag<IRunner> runnersBag, object o)
    {
        var results = new ConcurrentBag<bool>();
        var tasks = runnersBag.Select(async runner => results.Add(await runner.Run(o)));
        await Task.WhenAll(tasks);
        return results.All(result => result);
     }
}

但是,我对此实现有两个问题:

  • 我希望如果其中一个运行器已经返回 false,则该函数不应等待所有其他运行器。

  • 有些跑步者可能永远不会回来,我想使用暂停。如果运行器在 10 秒内未返回任何内容,则将被视为返回 true。

也许使用反应式扩展会有所帮助?

具有中断条件的异步

这是一个带有 Rx 的版本,包括超时。编写的代码将在 LINQPad 中运行,并添加 Rx 引用。您可以尝试在 RunnerFactory 方法中构造 TrueRunner、FalseRunner 和 SlowRunner 实例。

关键思想:

  • 使用 SelectToObservable() 启动异步任务并将其转换为 IObservable(有关更好的替代方案,请参见下文)
  • 使用 Timeout() 为每个任务添加超时,如果任务超时,则会根据请求替换 true 结果。
  • 使用Where过滤掉True结果 - 我们现在只会收到有关错误结果的通知。
  • 如果流中有任何元素,Any返回 true,否则返回 false,因此在后续Select中翻转其结果,我们就完成了。

法典:

void Main()
{
    Observable.Merge(
        RunnerFactory().Select(x => x.Run(null).ToObservable()
        .Timeout(TimeSpan.FromSeconds(1), Observable.Return(true))))
        .Where(res => !res)
        .Any().Select(res => !res)
        .Subscribe(
            res => Console.WriteLine("Result: " + res),
            ex => Console.WriteLine("Error: " + ex.Message));                                           
}
public IEnumerable<IRunner> RunnerFactory()
{
    yield return new FalseRunner();
    yield return new SlowRunner();
    yield return new TrueRunner();
}
public interface IRunner
{
    Task<bool> Run(object o);
}
public class Runner : IRunner
{
    protected bool _outcome;
    public Runner(bool outcome)
    {
        _outcome = outcome;
    }
    public virtual async Task<bool> Run(object o)
    {
        var result = await Task<bool>.Factory.StartNew(() => _outcome);     
        return result;
    }
}
public class TrueRunner : Runner
{
    public TrueRunner() : base(true) {}
}   
public class FalseRunner : Runner
{
    public FalseRunner() : base(false) {}
}   
public class SlowRunner : Runner
{
    public SlowRunner() : base(false) {}
    public override async Task<bool> Run(object o)
    {
        var result = await Task<bool>.Factory.StartNew(
            () => { Thread.Sleep(5000); return _outcome; });        
        return result;
    }
}   

鉴于我使用的 Runner 实现,其中的 OnError 处理程序是多余的;如果您想在实现中抑制 Runner 错误,您可能需要考虑 Catch - 您可以像我使用 Timeout 一样替换IObservable<bool>

编辑 我认为值得一提的另一件事是,使用 Observable.StartAsync 是启动任务的更好方法,并且也会为您提供取消的支持。下面是一些修改后的代码,显示了 SlowRunner 如何支持取消。令牌由StartAsync传入,如果订阅被释放,则会导致取消。如果检测到元素Any这一切都是透明的。

void Main()
{
    var runners = GetRunners();     
    Observable.Merge(runners.Select(r => Observable.StartAsync(ct => r.Run(ct, null))
                    .Timeout(TimeSpan.FromSeconds(10), Observable.Return(true))))
                    .Where(res => !res)
                    .Any().Select(res => !res)
                    .Subscribe(
                        res => Console.WriteLine("Result: " + res));
}
public static IEnumerable<IRunner> GetRunners()
{
    yield return new Runner(false);
    yield return new SlowRunner(true);
}
public interface IRunner
{
    Task<bool> Run(CancellationToken ct, object o);
}
public class Runner : IRunner
{
    protected bool _outcome;
    public Runner(bool outcome)
    {
        _outcome = outcome;
    }
    public async virtual Task<bool> Run(CancellationToken ct, object o)
    {
        var result = await Task<bool>.Factory.StartNew(() => _outcome);
        return result;
    }
}
public class SlowRunner : Runner
{
    public SlowRunner(bool outcome) : base(outcome)
    {
    }
    public async override Task<bool> Run(CancellationToken ct, object o)
    {
        var result = await Task<bool>.Factory.StartNew(() => 
        {
            for(int i=0; i<5; i++)
            {
                if(ct.IsCancellationRequested)
                {
                    Console.WriteLine("Cancelled");                     
                }
                ct.ThrowIfCancellationRequested();
                Thread.Sleep(1000);
            };
            return _outcome;
        });
        return result;
    }
}

使用Parallel.ForEach()怎么样?下面的代码应该让你明白我的意思。

您可以定义CancellationTokenSource

CancellationTokenSource cancellationToken = new CancellationTokenSource();
ParallelOptions po = new ParallelOptions();
po.CancellationToken = cancellationToken.Token;

然后将po传递给Parallel.ForEach

Parallel.ForEach(items, po, item =>
{
   //...
   if(num < 12)
     cancellationToken.Cancel(false);
});
return !cancellationToken.IsCancellationRequested;