是否可以在一些.net Parallel.ForEach()代码中执行一些async/await ?
本文关键字:执行 代码 async await ForEach net 是否 Parallel | 更新日期: 2023-09-27 18:04:03
给定以下代码,在Parallel.ForEach
中执行async/await
是否OK ?
。
Parallel.ForEach(names, name =>
{
// Do some stuff...
var foo = await GetStuffFrom3rdPartyAsync(name);
// Do some more stuff, with the foo.
});
还是我需要知道一些问题?
编辑:不知道这是否编译,顺便说一句。只是伪代码…自言自语
不,把async
和Paralell.Foreach
结合在一起是没有意义的。
考虑下面的例子:
private void DoSomething()
{
var names = Enumerable.Range(0,10).Select(x=> "Somename" + x);
Parallel.ForEach(names, async(name) =>
{
await Task.Delay(1000);
Console.WriteLine("Name {0} completed",name);
});
Console.WriteLine("Parallel ForEach completed");
}
您期望的输出是什么?
Name Somename3 completed
Name Somename8 completed
Name Somename4 completed
...
Parallel ForEach completed
那是不会发生的。它将输出:
Parallel ForEach completed
Name Somename3 completed
Name Somename8 completed
Name Somename4 completed
...
为什么?因为当ForEach
首先到达await
时,方法实际上返回,Parallel.ForEach
不知道它是异步的,它运行到完成!await
之后的代码在另一个线程上作为延续运行,而不是 "并行处理线程"
Stephen toub在这里提到了这个
从名称来看,我假设GetStuffFrom3rdPartyAsync
是I/o绑定的。Parallel
类专门用于cpu绑定的代码。
在异步环境中,您可以启动多个任务,然后(异步地)使用Task.WhenAll
等待它们全部完成。由于您从一个序列开始,因此最简单的方法可能是将每个元素项目到一个异步操作中,然后等待所有这些操作:
await Task.WhenAll(names.Select(async name =>
{
// Do some stuff...
var foo = await GetStuffFrom3rdPartyAsync(name);
// Do some more stuff, with the foo.
}));
一个更接近的替代方案可能是:
static void ForEach<T>(IEnumerable<T> data, Func<T, Task> func)
{
var tasks = data.Select(item =>
Task.Run(() => func(item)));
Task.WaitAll(tasks.ToArray());
}
// ...
ForEach(names, name => GetStuffFrom3rdPartyAsync(name));
理想情况下,你不应该使用像Task.WaitAll
这样的阻塞调用,如果你可以使整个方法链调用async
,在当前调用堆栈上"一路向下":
var tasks = data.Select(item =>
Task.Run(() => func(item)));
await Task.WhenAll(tasks.ToArray());
此外,如果在GetStuffFrom3rdPartyAsync
中不做任何cpu限制的工作,Task.Run
可能是冗余的:
var tasks = data.Select(item => func(item));
正如@Sriram Sakthivel指出的那样,使用Parallel.ForEach
与异步lambda存在一些问题。Steven Toub的ForEachASync
可以做同样的事情。他在这里谈到了它,但这里是代码:
public static class Extensions
{
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition) while (partition.MoveNext()) await body(partition.Current);
}));
}
}
它使用Partitioner
类来创建一个负载平衡分区(doco),并允许您使用dop
参数指定要运行多少线程。看看它和Parallel.ForEach
的区别。试试下面的代码:
class Program
{
public static async Task GetStuffParallelForEach()
{
var data = Enumerable.Range(1, 10);
Parallel.ForEach(data, async i =>
{
await Task.Delay(1000 * i);
Console.WriteLine(i);
});
}
public static async Task GetStuffForEachAsync()
{
var data = Enumerable.Range(1, 10);
await data.ForEachAsync(5, async i =>
{
await Task.Delay(1000 * i);
Console.WriteLine(i);
});
}
static void Main(string[] args)
{
//GetStuffParallelForEach().Wait(); // Finished printed before work is complete
GetStuffForEachAsync().Wait(); // Finished printed after all work is done
Console.WriteLine("Finished");
Console.ReadLine();
}
如果运行GetStuffForEachAsync
,程序等待所有工作完成。如果运行GetStuffParallelForEach
,在工作完成之前将打印Finished
行。