使用 TPL/异步的递归异步调用等待
本文关键字:异步 调用 等待 递归 使用 TPL | 更新日期: 2023-09-27 17:55:51
我正在考虑使用C#异步功能(TPL/async/await)以递归方式处理分层结构。以下是我正在尝试做的事情的概述
我有一个作业集合要处理,如下所示。每个工作都有事情要做,可以选择有一个或多个孩子也有事情要做。所有父作业和子作业都调用相同的函数来执行实际的"工作",并且此函数在"异步"中(下面的代码)
/*
* Jobs Collection
* |
* |__ Job1
* | |__ Job4
* | | |__ Job7
* | |
* | |__ Job5
* |
* |__ Job2
* | |__ Job6
* |
* |__ Job3
* |
*/
层次结构中有 3 个级别。
我想开始并行处理第一级(作业 1、作业 2、作业 3)。
一旦它们并行启动,每个单独的作业将开始处理本身,等待其处理完成(重要),然后继续递归处理其子项,直到层次结构结束。子项依赖于父项处理的数据,因此它们等待父项处理完成。
实际"作业"(由父项和子项调用)的处理异步发生,因为调用方法异步工作 - 因此不需要"新线程"(Task.StartNew())。
以下是我用来演示场景的示例代码 -
public void Process()
{
WebJob[] jobs = CreateWebJobs(); // dummy jobs
// first level
Parallel.ForEach(jobs,
new ParallelOptions { MaxDegreeOfParallelism = 2 }, // parallelism hardcoded for simplicity
(job) => ExecuteJob(job));
}
private void ExecuteJob(WebJob job, [CallerMemberName] string memberName = "")
{
Console.ForegroundColor = ConsoleColor.DarkYellow;
Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "'t", job.Name, "'t", Thread.CurrentThread.ManagedThreadId);
Task t = GetDataAsync(job);
t.Wait(); // needed such that parent response is received before children start over (?).
if (job.Children != null)
{
job.Children.ToList().ForEach((r) =>
{
r.ParentResponse = job.Response; // Children need parent's response
ExecuteJob(r);
});
}
}
private async Task GetDataAsync(WebJob j)
{
// This is just test code. Ideally it would be an external call to some "async" method
await Task.Delay(1000);
j.Response = string.Format("{0} complete", j.Name);
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine("parentResp>> {0} :: {1} Job>> {2} :: {3} Thread>> {4}", j.ParentResponse, "'t", j.Name, "'t", Thread.CurrentThread.ManagedThreadId);
Console.WriteLine("--------------");
}
private WebJob[] CreateWebJobs()
{
return new WebJob[] {
new WebJob() { Id=1, Name = "Job1", ExecURL = "http://url1",
Children = new WebJob[]
{
new WebJob()
{
Id=2, Name = "Job2", ExecURL = "http://url2",
Children = new WebJob[]
{
new WebJob() { Id=4, Name = "Job4", ExecURL = "http://url4" }
}
},
new WebJob()
{
Id=3, Name = "Job3", ExecURL = "http://url3"
}
}
},
new WebJob() { Id=5, Name = "Job5", ExecURL = "http://url5"}
};
}
- 流程方法从启动所有"第一级"作业开始平行。
- 执行作业方法接管并递归地转到子项完成所有处理
这工作正常,但我不相信这种递归异步模式是否是一种有效的方法。我想避免t.Wait()。我已经在t上尝试了ContinueWith,在我的理解中似乎没有什么不同,我还阅读了有关ForEachAsync模式的信息,并想知道这是否合适。此解决方案最终将成为 ASP.NET Web API 服务。对这种递归异步模式有什么想法吗?
如果GetDataAsync
是您拥有的唯一阻塞操作,则可以始终使用异步编程,从而避免Parallel.ForEach
调用或阻塞Wait
调用的需要。
public async Task Process()
{
WebJob[] jobs = CreateWebJobs(); // dummy jobs
await Task.WhenAll(jobs.Select(ExecuteJob));
}
private async Task ExecuteJob(WebJob job, [CallerMemberName] string memberName = "")
{
Console.ForegroundColor = ConsoleColor.DarkYellow;
Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "'t", job.Name, "'t", Thread.CurrentThread.ManagedThreadId);
await GetDataAsync(job);
if (job.Children != null)
{
var childTasks = job.Children.Select(r =>
{
r.ParentResponse = job.Response;
return ExecuteJob(r);
});
await Task.WhenAll(childTasks);
}
}
编辑:如果顶级方法应该阻止(而不是冒着让消费者即发即弃的风险),请执行以下操作:
public void Process()
{
WebJob[] jobs = CreateWebJobs(); // dummy jobs
Task.WaitAll(jobs.Select(ExecuteJob));
}
由于您的内核是异步的,因此根本不应该使用并行或多线程。你想要的是没有并行性的并发 - 即异步并发,通常使用 Task.WhenAll
完成。
这是双重的,因为您计划部署到 ASP.NET,其中并行性会显着降低您的可伸缩性。
public async Task ProcessAsync()
{
WebJob[] jobs = CreateWebJobs();
await Task.WhenAll(jobs.Select(x => ExecuteJobAsync(x)));
}
private async Task ExecuteJobAsync(WebJob job, [CallerMemberName] string memberName = "")
{
Console.ForegroundColor = ConsoleColor.DarkYellow;
Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "'t", job.Name, "'t", Thread.CurrentThread.ManagedThreadId);
await GetDataAsync(job);
if (job.Children != null)
{
var childTasks = job.Children.Select(async x =>
{
x.ParentResponse = job.Response; // Children need parent's response
await ExecuteJobAsync(x);
});
await Task.WhenAll(childTasks);
}
}