使用 TPL/异步的递归异步调用等待

本文关键字:异步 调用 等待 递归 使用 TPL | 更新日期: 2023-09-27 17:55:51

我正在考虑使用C#异步功能(TPL/async/await)以递归方式处理分层结构。以下是我正在尝试做的事情的概述

我有一个作业集合要处理,如下所示。每个工作都有事情要做,可以选择有一个或多个孩子也有事情要做。所有父作业和子作业都调用相同的函数来执行实际的"工作",并且此函数在"异步"中(下面的代码)

/*
 *  Jobs Collection
 *  |
 *  |__ Job1
 *  |    |__ Job4
 *  |    |     |__ Job7
 *  |    |
 *  |    |__ Job5
 *  |
 *  |__ Job2
 *  |    |__ Job6
 *  |
 *  |__ Job3
 *  |
 */
  1. 层次结构中有 3 个级别。

  2. 我想开始并行处理第一级(作业 1、作业 2、作业 3)。

  3. 一旦它们并行启动,每个单独的作业将开始处理本身,等待其处理完成(重要),然后继续递归处理其子项,直到层次结构结束。子项依赖于父项处理的数据,因此它们等待父项处理完成。

  4. 实际"作业"(由父项和子项调用)的处理异步发生,因为调用方法异步工作 - 因此不需要"新线程"(Task.StartNew())。

以下是我用来演示场景的示例代码 -

public void Process()
{
    WebJob[] jobs = CreateWebJobs(); // dummy jobs
    // first level 
    Parallel.ForEach(jobs,
                new ParallelOptions { MaxDegreeOfParallelism = 2 }, // parallelism hardcoded for simplicity
                (job) => ExecuteJob(job));
}
private void ExecuteJob(WebJob job, [CallerMemberName] string memberName = "")
{
    Console.ForegroundColor = ConsoleColor.DarkYellow;
    Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "'t", job.Name, "'t", Thread.CurrentThread.ManagedThreadId);
    Task t = GetDataAsync(job);
    t.Wait(); // needed such that parent response is received before children start over (?).

    if (job.Children != null)
    {
        job.Children.ToList().ForEach((r) =>
        {
            r.ParentResponse = job.Response; // Children need parent's response
            ExecuteJob(r);
        });
    }
}
private async Task GetDataAsync(WebJob j)
{
    // This is just test code. Ideally it would be an external call to some "async" method
    await Task.Delay(1000);
    j.Response = string.Format("{0} complete", j.Name);
    Console.ForegroundColor = ConsoleColor.Cyan;
    Console.WriteLine("parentResp>> {0} :: {1} Job>> {2} :: {3} Thread>> {4}", j.ParentResponse, "'t", j.Name, "'t", Thread.CurrentThread.ManagedThreadId);
    Console.WriteLine("--------------");
}
private WebJob[] CreateWebJobs()
{
    return new WebJob[] {
        new WebJob() { Id=1, Name = "Job1", ExecURL = "http://url1", 
            Children = new WebJob[] 
            {
                new WebJob() 
                { 
                    Id=2, Name = "Job2", ExecURL = "http://url2", 
                    Children = new WebJob[] 
                    {
                        new WebJob() { Id=4, Name = "Job4", ExecURL = "http://url4" }
                    }
                },
                new WebJob() 
                { 
                    Id=3, Name = "Job3", ExecURL = "http://url3" 
                }
            }
        },
        new WebJob() { Id=5, Name = "Job5", ExecURL = "http://url5"}                
    };
}
  • 流程方法从启动所有"第一级"作业开始平行。
  • 执行作业方法接管并递归地转到子项完成所有处理

这工作正常,但我不相信这种递归异步模式是否是一种有效的方法。我想避免t.Wait()。我已经在t上尝试了ContinueWith,在我的理解中似乎没有什么不同,我还阅读了有关ForEachAsync模式的信息,并想知道这是否合适。此解决方案最终将成为 ASP.NET Web API 服务。对这种递归异步模式有什么想法吗?

使用 TPL/异步的递归异步调用等待

如果GetDataAsync是您拥有的唯一阻塞操作,则可以始终使用异步编程,从而避免Parallel.ForEach调用或阻塞Wait调用的需要。

public async Task Process()
{
    WebJob[] jobs = CreateWebJobs(); // dummy jobs
    await Task.WhenAll(jobs.Select(ExecuteJob));
}
private async Task ExecuteJob(WebJob job, [CallerMemberName] string memberName = "")
{
    Console.ForegroundColor = ConsoleColor.DarkYellow;
    Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "'t", job.Name, "'t", Thread.CurrentThread.ManagedThreadId);
    await GetDataAsync(job);
    if (job.Children != null)
    {
        var childTasks = job.Children.Select(r =>
        {
            r.ParentResponse = job.Response;
            return ExecuteJob(r);
        });
        await Task.WhenAll(childTasks);
    }
}

编辑:如果顶级方法应该阻止(而不是冒着让消费者即发即弃的风险),请执行以下操作:

public void Process()
{
    WebJob[] jobs = CreateWebJobs(); // dummy jobs
    Task.WaitAll(jobs.Select(ExecuteJob));
}

由于您的内核是异步的,因此根本不应该使用并行或多线程。你想要的是没有并行性的并发 - 即异步并发,通常使用 Task.WhenAll 完成。

这是双重的,因为您计划部署到 ASP.NET,其中并行性会显着降低您的可伸缩性。

public async Task ProcessAsync()
{
  WebJob[] jobs = CreateWebJobs();
  await Task.WhenAll(jobs.Select(x => ExecuteJobAsync(x)));
}
private async Task ExecuteJobAsync(WebJob job, [CallerMemberName] string memberName = "")
{
  Console.ForegroundColor = ConsoleColor.DarkYellow;
  Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "'t", job.Name, "'t", Thread.CurrentThread.ManagedThreadId);
  await GetDataAsync(job);
  if (job.Children != null)
  {
    var childTasks = job.Children.Select(async x =>
    {
      x.ParentResponse = job.Response; // Children need parent's response
      await ExecuteJobAsync(x);
    });
    await Task.WhenAll(childTasks);
  }
}