具有中断条件的异步
本文关键字:异步 条件 中断 | 更新日期: 2023-09-27 18:35:59
我有一个接口:
public interface IRunner
{
Task<bool> Run(object o);
}
我有一些使用异步实现它的类,例如:
public class Runner1 : IRunner
{
public async Task<bool> Run(object o)
{
var num = await SomeExternalAsyncFunc(o);
return num < 12;
}
}
我需要实现一个在所有 Runners 类上并行运行的函数,并且只有在所有类都返回 true 时才返回 true。阅读本文和本文后,我想出了以下实现:
public class RunMannager
{
public async Task<bool> Run(ConcurrentBag<IRunner> runnersBag, object o)
{
var results = new ConcurrentBag<bool>();
var tasks = runnersBag.Select(async runner => results.Add(await runner.Run(o)));
await Task.WhenAll(tasks);
return results.All(result => result);
}
}
但是,我对此实现有两个问题:
我希望如果其中一个运行器已经返回 false,则该函数不应等待所有其他运行器。
有些跑步者可能永远不会回来,我想使用暂停。如果运行器在 10 秒内未返回任何内容,则将被视为返回 true。
也许使用反应式扩展会有所帮助?
这是一个带有 Rx 的版本,包括超时。编写的代码将在 LINQPad 中运行,并添加 Rx 引用。您可以尝试在 RunnerFactory 方法中构造 TrueRunner、FalseRunner 和 SlowRunner 实例。
关键思想:
- 使用
Select
和ToObservable()
启动异步任务并将其转换为 IObservable(有关更好的替代方案,请参见下文) - 使用
Timeout()
为每个任务添加超时,如果任务超时,则会根据请求替换 true 结果。 - 使用
Where
过滤掉True
结果 - 我们现在只会收到有关错误结果的通知。 - 如果流中有任何元素,
Any
返回 true,否则返回 false,因此在后续Select
中翻转其结果,我们就完成了。
法典:
void Main()
{
Observable.Merge(
RunnerFactory().Select(x => x.Run(null).ToObservable()
.Timeout(TimeSpan.FromSeconds(1), Observable.Return(true))))
.Where(res => !res)
.Any().Select(res => !res)
.Subscribe(
res => Console.WriteLine("Result: " + res),
ex => Console.WriteLine("Error: " + ex.Message));
}
public IEnumerable<IRunner> RunnerFactory()
{
yield return new FalseRunner();
yield return new SlowRunner();
yield return new TrueRunner();
}
public interface IRunner
{
Task<bool> Run(object o);
}
public class Runner : IRunner
{
protected bool _outcome;
public Runner(bool outcome)
{
_outcome = outcome;
}
public virtual async Task<bool> Run(object o)
{
var result = await Task<bool>.Factory.StartNew(() => _outcome);
return result;
}
}
public class TrueRunner : Runner
{
public TrueRunner() : base(true) {}
}
public class FalseRunner : Runner
{
public FalseRunner() : base(false) {}
}
public class SlowRunner : Runner
{
public SlowRunner() : base(false) {}
public override async Task<bool> Run(object o)
{
var result = await Task<bool>.Factory.StartNew(
() => { Thread.Sleep(5000); return _outcome; });
return result;
}
}
鉴于我使用的 Runner 实现,其中的 OnError 处理程序是多余的;如果您想在实现中抑制 Runner 错误,您可能需要考虑 Catch - 您可以像我使用 Timeout 一样替换IObservable<bool>
。
编辑 我认为值得一提的另一件事是,使用 Observable.StartAsync
是启动任务的更好方法,并且也会为您提供取消的支持。下面是一些修改后的代码,显示了 SlowRunner 如何支持取消。令牌由StartAsync
传入,如果订阅被释放,则会导致取消。如果检测到元素Any
这一切都是透明的。
void Main()
{
var runners = GetRunners();
Observable.Merge(runners.Select(r => Observable.StartAsync(ct => r.Run(ct, null))
.Timeout(TimeSpan.FromSeconds(10), Observable.Return(true))))
.Where(res => !res)
.Any().Select(res => !res)
.Subscribe(
res => Console.WriteLine("Result: " + res));
}
public static IEnumerable<IRunner> GetRunners()
{
yield return new Runner(false);
yield return new SlowRunner(true);
}
public interface IRunner
{
Task<bool> Run(CancellationToken ct, object o);
}
public class Runner : IRunner
{
protected bool _outcome;
public Runner(bool outcome)
{
_outcome = outcome;
}
public async virtual Task<bool> Run(CancellationToken ct, object o)
{
var result = await Task<bool>.Factory.StartNew(() => _outcome);
return result;
}
}
public class SlowRunner : Runner
{
public SlowRunner(bool outcome) : base(outcome)
{
}
public async override Task<bool> Run(CancellationToken ct, object o)
{
var result = await Task<bool>.Factory.StartNew(() =>
{
for(int i=0; i<5; i++)
{
if(ct.IsCancellationRequested)
{
Console.WriteLine("Cancelled");
}
ct.ThrowIfCancellationRequested();
Thread.Sleep(1000);
};
return _outcome;
});
return result;
}
}
使用Parallel.ForEach()
怎么样?下面的代码应该让你明白我的意思。
您可以定义CancellationTokenSource
CancellationTokenSource cancellationToken = new CancellationTokenSource();
ParallelOptions po = new ParallelOptions();
po.CancellationToken = cancellationToken.Token;
然后将po
传递给Parallel.ForEach
Parallel.ForEach(items, po, item =>
{
//...
if(num < 12)
cancellationToken.Cancel(false);
});
return !cancellationToken.IsCancellationRequested;