如何安排任务,使它们不能并发运行,包括异步延续
本文关键字:运行 并发 包括 不能 异步 延续 何安排 任务 | 更新日期: 2023-09-27 18:02:00
对于下面的代码,是否可以为Task doThing
的实例定义调度程序、创建和延续设置?
我希望能够调度doThing
的多个实例,以便它们实际上独占地从其他实例运行(即使它们正在等待其他子任务)。
private static async Task doThing(object i)
{
Console.WriteLine("in do thing {0}", (int)i);
await Task.Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("out of do thing {0}", (int)i);
}
static void Main(string[] args)
{
CancellationTokenSource source = new CancellationTokenSource();
ConcurrentExclusiveSchedulerPair pair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Current);
Task Task1 = Task.Factory.StartNew((Func<object,Task>)doThing, 1, source.Token, TaskCreationOptions.AttachedToParent, pair.ExclusiveScheduler).Unwrap();
Task Task2 = Task.Factory.StartNew((Func<object, Task>)doThing, 2, source.Token, TaskCreationOptions.AttachedToParent, pair.ExclusiveScheduler);
Task Task3 = doThing(3);
Task Task4 = Task.Factory.StartNew(async (i) =>
{
Console.WriteLine("in do thing {0}", (int)i);
await Task.Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("out of do thing {0}", (int)i);
}, 4, source.Token, TaskCreationOptions.None, pair.ExclusiveScheduler);
Task.WaitAll(Task1, Task2, Task3, Task4);
Console.ReadKey();
return;
}
TPL TaskSchedulers一次只能看到异步方法的一个同步段,所以你不能简单地用schedulers来做。但是您可以使用更高级的原语做到这一点。我经常使用的一个是TPL数据流。
首先安装NuGet包:
Install-Package Microsoft.Tpl.Dataflow
然后使用以下代码:
private static async Task doThing(object i) {
Console.WriteLine("in do thing {0}", (int)i);
await Task.Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("out of do thing {0}", (int)i);
}
static void Main(string[] args) {
CancellationTokenSource source = new CancellationTokenSource();
var exclusivityBlock = new ActionBlock<Func<Task>>(f => f(), new ExecutionDataflowBlockOptions { CancellationToken = source.Token }};
exclusivityBlock.Post(() => doThing(1));
exclusivityBlock.Post(() => doThing(2));
exclusivityBlock.Post(() => doThing(3));
exclusivityBlock.Post(
async () => {
Console.WriteLine("in do thing {0}", 4);
await Task.Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("out of do thing {0}", 4);
});
exclusivityBlock.Complete();
exclusivityBlock.Completion.Wait();
Console.WriteLine("Done");
Console.ReadKey();
return;
}
此代码缺少针对每个发布的工作项的单独Task。如果这很重要,您可以使用以下示例:
internal static class Program {
private static async Task doThing(object i) {
Console.WriteLine("in do thing {0}", (int)i);
await Task.Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("out of do thing {0}", (int)i);
}
private static void Main(string[] args) {
CancellationTokenSource source = new CancellationTokenSource();
var exclusivityBlock = CreateTrackingBlock<Func<Task>>(
f => f(), new ExecutionDataflowBlockOptions { CancellationToken = source.Token });
var task1 = exclusivityBlock.PostWithCompletion(() => doThing(1));
var task2 = exclusivityBlock.PostWithCompletion(() => doThing(2));
var task3 = exclusivityBlock.PostWithCompletion(() => doThing(3));
var task4 = exclusivityBlock.PostWithCompletion(
async () => {
Console.WriteLine("in do thing {0}", 4);
await Task.Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("out of do thing {0}", 4);
});
Task.WaitAll(task1, task2, task3, task4);
Console.WriteLine("Done");
Console.ReadKey();
return;
}
private static ActionBlock<Tuple<T, TaskCompletionSource<object>>> CreateTrackingBlock<T>(Func<T, Task> action, ExecutionDataflowBlockOptions options = null) {
return new ActionBlock<Tuple<T, TaskCompletionSource<object>>>(
async tuple => {
try {
await action(tuple.Item1);
tuple.Item2.TrySetResult(null);
} catch (Exception ex) {
tuple.Item2.TrySetException(ex);
}
},
options ?? new ExecutionDataflowBlockOptions());
}
internal static Task PostWithCompletion<T>(this ActionBlock<Tuple<T, TaskCompletionSource<object>>> block, T value) {
var tcs = new TaskCompletionSource<object>();
var tuple = Tuple.Create(value, tcs);
block.Post(tuple);
return tcs.Task;
}
}
请注意,这只是有点费力,因为Dataflow主要不是为跟踪单个提交而设计的,而是为了跟踪整个过程。因此,虽然上面的答案很好,但Stephen Cleary的答案可能更简单,因此更可取。
给出下面的代码,是否可以为Task doThing的实例定义调度程序、创建和延续设置?
坏消息是:不,没有办法这样做。为非lambda任务定义"调度器"是没有意义的。不需要创建选项,并且延续选项是在延续上设置的,而不是在任务本身上。
好消息是:你不需要这个行为。
需要异步同步。内置的方法是使用SemaphoreSlim
,如下所示:
SemaphoreSlim mutex = new SemaphoreSlim(1);
private static async Task doThingAsync(object i)
{
await mutex.WaitAsync();
try
{
Console.WriteLine("in do thing {0}", (int)i);
await Task.Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("out of do thing {0}", (int)i);
}
finally
{
mutex.Release();
}
}
就我个人而言,我认为finally
语法是尴尬的,所以我定义了一个IDisposable
,并使用using
代替。
如果你需要更强大的功能,Stephen Toub有一个异步协调原语系列,我的AsyncEx库中有一套完整的原语。这两个资源都包含一个带有Task<IDisposable> WaitAsync()
成员的AsyncLock
,因此您可以使用using
而不是finally
。