是否可以在一些.net Parallel.ForEach()代码中执行一些async/await ?

本文关键字:执行 代码 async await ForEach net 是否 Parallel | 更新日期: 2023-09-27 18:04:03

给定以下代码,在Parallel.ForEach中执行async/await是否OK ?

Parallel.ForEach(names, name =>
{
    // Do some stuff...
    var foo = await GetStuffFrom3rdPartyAsync(name);
    // Do some more stuff, with the foo.
});

还是我需要知道一些问题?

编辑:不知道这是否编译,顺便说一句。只是伪代码…自言自语

是否可以在一些.net Parallel.ForEach()代码中执行一些async/await ?

不,把asyncParalell.Foreach结合在一起是没有意义的。

考虑下面的例子:

private void DoSomething()
{
    var names = Enumerable.Range(0,10).Select(x=> "Somename" + x);
    Parallel.ForEach(names, async(name) =>
    {   
        await Task.Delay(1000);
        Console.WriteLine("Name {0} completed",name);
    });
    Console.WriteLine("Parallel ForEach completed");
}

您期望的输出是什么?

Name Somename3 completed
Name Somename8 completed
Name Somename4 completed
...
Parallel ForEach completed

那是不会发生的。它将输出:

Parallel ForEach completed
Name Somename3 completed
Name Somename8 completed
Name Somename4 completed
...

为什么?因为当ForEach首先到达await时,方法实际上返回,Parallel.ForEach不知道它是异步的,它运行到完成!await之后的代码在另一个线程上作为延续运行,而不是 "并行处理线程"

Stephen toub在这里提到了这个

从名称来看,我假设GetStuffFrom3rdPartyAsync是I/o绑定的。Parallel类专门用于cpu绑定的代码。

在异步环境中,您可以启动多个任务,然后(异步地)使用Task.WhenAll等待它们全部完成。由于您从一个序列开始,因此最简单的方法可能是每个元素项目到一个异步操作中,然后等待所有这些操作:

await Task.WhenAll(names.Select(async name =>
{
  // Do some stuff...
  var foo = await GetStuffFrom3rdPartyAsync(name);
  // Do some more stuff, with the foo.
}));

一个更接近的替代方案可能是:

static void ForEach<T>(IEnumerable<T> data, Func<T, Task> func)
{
    var tasks = data.Select(item => 
        Task.Run(() => func(item)));
    Task.WaitAll(tasks.ToArray());
}
// ... 
ForEach(names, name => GetStuffFrom3rdPartyAsync(name));

理想情况下,你不应该使用像Task.WaitAll这样的阻塞调用,如果你可以使整个方法链调用async,在当前调用堆栈上"一路向下":

var tasks = data.Select(item => 
    Task.Run(() => func(item)));
await Task.WhenAll(tasks.ToArray());

此外,如果在GetStuffFrom3rdPartyAsync中不做任何cpu限制的工作,Task.Run可能是冗余的:

var tasks = data.Select(item => func(item));

正如@Sriram Sakthivel指出的那样,使用Parallel.ForEach与异步lambda存在一些问题。Steven Toub的ForEachASync可以做同样的事情。他在这里谈到了它,但这里是代码:

public static class Extensions
{
    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    {
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select Task.Run(async delegate {
                                               using (partition) while (partition.MoveNext()) await body(partition.Current);
            }));
    }
}

它使用Partitioner类来创建一个负载平衡分区(doco),并允许您使用dop参数指定要运行多少线程。看看它和Parallel.ForEach的区别。试试下面的代码:

 class Program
    {
        public static async Task GetStuffParallelForEach()
        {
            var data = Enumerable.Range(1, 10);
            Parallel.ForEach(data, async i =>
            {
                await Task.Delay(1000 * i);
                Console.WriteLine(i);
            });
        }
        public static async Task GetStuffForEachAsync()
        {
            var data = Enumerable.Range(1, 10);
            await data.ForEachAsync(5, async i =>
            {
                await Task.Delay(1000 * i);
                Console.WriteLine(i);
            });
        }
        static void Main(string[] args)
        {
            //GetStuffParallelForEach().Wait(); // Finished printed before work is complete
            GetStuffForEachAsync().Wait(); // Finished printed after all work is done
            Console.WriteLine("Finished");
            Console.ReadLine();
        }

如果运行GetStuffForEachAsync,程序等待所有工作完成。如果运行GetStuffParallelForEach,在工作完成之前将打印Finished行。