Parallel.ForEach和async正在等待

本文关键字:在等待 async ForEach Parallel | 更新日期: 2023-09-27 17:58:44

我有这样的方法:

public async Task<MyResult> GetResult()
{
    MyResult result = new MyResult();
    foreach(var method in Methods)
    {
        string json = await Process(method);
        result.Prop1 = PopulateProp1(json);
        result.Prop2 = PopulateProp2(json);
    }
    return result;
}

然后我决定使用Parallel.ForEach:

public async Task<MyResult> GetResult()
{
    MyResult result = new MyResult();
    Parallel.ForEach(Methods, async method =>
    {
        string json = await Process(method);    
        result.Prop1 = PopulateProp1(json);
        result.Prop2 = PopulateProp2(json);
    });
    return result;
}

但现在我有一个错误:

异步模块或处理程序已完成,而异步操作仍处于挂起状态。

Parallel.ForEach和async正在等待

asyncForEach不能很好地配合使用。特别是,您的async lambda正被转换为async void方法。有很多原因可以避免async void(正如我在MSDN的一篇文章中所描述的);其中之一是您无法轻易检测async lambda何时完成。ASP.NET将在不完成async void方法的情况下看到您的代码返回,并(适当地)抛出异常。

您可能想要做的是同时处理数据,只是不以并行处理。并行代码几乎不应该在ASP.NET上使用

public async Task<MyResult> GetResult()
{
  MyResult result = new MyResult();
  var tasks = Methods.Select(method => ProcessAsync(method)).ToArray();
  string[] json = await Task.WhenAll(tasks);
  result.Prop1 = PopulateProp1(json[0]);
  ...
  return result;
}

.NET6最终添加了Parallel.ForEachAsync,这是一种调度异步工作的方法,允许您控制并行度:

var urlsToDownload = new [] 
{
    "https://dotnet.microsoft.com",
    "https://www.microsoft.com",
    "https://twitter.com/shahabfar"
};
var client = new HttpClient();
var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urlsToDownload, options, async (url, token) =>
{
    var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);
    var response = await client.GetAsync(url, token);
    // The request will be canceled in case of an error in another URL.
    if (response.IsSuccessStatusCode)
    {
        using var target = File.OpenWrite(targetPath);
        await response.Content.CopyToAsync(target);
    }
});

或者,使用AsyncEnumerator NuGet包,您可以执行以下操作:

using System.Collections.Async;
public async Task<MyResult> GetResult()
{
    MyResult result = new MyResult();
    await Methods.ParallelForEachAsync(async method =>
    {
        string json = await Process(method);    
        result.Prop1 = PopulateProp1(json);
        result.Prop2 = PopulateProp2(json);
    }, maxDegreeOfParallelism: 10);
    return result;
}

其中CCD_ 9是一种扩展方法。

啊,好吧。我想我知道现在发生了什么。async method =>是一个"async void",它是"fire and forget"(不建议用于事件处理程序以外的任何程序)。这意味着调用者无法知道它何时完成。。。因此,GetResult在操作仍在运行时返回。尽管我第一个答案的技术细节不正确,但这里的结果是一样的:GetResult正在返回,而ForEach启动的操作仍在运行。真正可以做的唯一一件事是不在Process上使用await(这样lambda就不再是async),并等待Process完成每次迭代。但是,这将使用至少一个线程池线程来完成这一操作,从而稍微加重池的压力——可能会使ForEach的使用变得毫无意义。我根本不会使用Parallel.ForEach…