并行运行异步方法

本文关键字:异步方法 运行 并行 | 更新日期: 2023-09-27 17:59:12

我有一个异步方法GetExpensiveThing(),它执行一些昂贵的I/O工作。这就是我使用它的方式:

// Serial execution
public async Task<List<Thing>> GetThings()
{
    var first = await GetExpensiveThing();
    var second = await GetExpensiveThing();
    return new List<Thing>() { first, second };
}

但由于这是一个昂贵的方法,我想并行执行这些调用。我本以为移动等待会解决这个问题:

// Serial execution
public async Task<List<Thing>> GetThings()
{
    var first = GetExpensiveThing();
    var second = GetExpensiveThing();
    return new List<Thing>() { await first, await second };
}

这不起作用,所以我把它们封装在一些任务中,这起作用:

// Parallel execution
public async Task<List<Thing>> GetThings()
{
    var first = Task.Run(() =>
    {
        return GetExpensiveThing();
    });
    var second = Task.Run(() =>
    {
        return GetExpensiveThing();
    });
    return new List<Thing>() { first.Result, second.Result };
}

我甚至试着在任务中玩等待和异步,但这真的很令人困惑,我运气不好。

是否有更好的并行运行异步方法,或者任务是一种好方法

并行运行异步方法

有没有更好的并行运行异步方法,或者任务是一种好方法?

是的,"最好"的方法是使用Task.WhenAll方法。然而,您的第二种方法应该并行运行。我已经创建了一个.NET Fiddle,这应该有助于揭示一些信息。第二种方法实际上应该是并行运行。我的小提琴证明了这一点!

考虑以下内容:

public Task<Thing[]> GetThingsAsync()
{
    var first = GetExpensiveThingAsync();
    var second = GetExpensiveThingAsync();
    return Task.WhenAll(first, second);
}

注意

最好使用"Async"后缀,而不是GetThingsGetExpensiveThing——我们应该分别使用GetThingsAsyncGetExpensiveThingAsync——源。

Task.WhenAll()有一种在没有节制/节流的情况下,同时启动大规模/大量任务时表现不佳的趋势。

如果您正在列表中执行大量任务并希望等待最终结果,那么我建议使用partition,并限制并行度。

我修改了Stephen Toub的博客优雅的现代LINQ方法:

public static Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> funcBody, int maxDoP = 4)
{
    async Task AwaitPartition(IEnumerator<T> partition)
    {
        using (partition)
        {
            while (partition.MoveNext())
            {
                 await Task.Yield(); // prevents a sync/hot thread hangup
                 await funcBody(partition.Current);
            }
        }
    }
    return Task.WhenAll(
        Partitioner
            .Create(source)
            .GetPartitions(maxDoP)
            .AsParallel()
            .Select(p => AwaitPartition(p)));
}

它的工作原理很简单,取一个IEnumerable,将其分解为均匀的分区,并同时针对每个分区中的每个元素启动一个函数/方法。在任何时间,每个分区中的元素都不超过一个,但n个分区中有n个任务。

扩展用途:

await myList.ParallelForEachAsync(myFunc, Environment.ProcessorCount);

编辑:如果您需要更多选项,我现在会在Github的存储库中保留一些重载。它也在NetStandard的NuGet中。

编辑2:感谢Theodor在下面的评论,我能够通过使用await Task.Yield();来减轻写得不好的异步任务对并行性的阻塞。

您可以使用Task.WhenAll,当所有依赖的任务完成时返回

请在此处查看此问题以供参考

如果GetExpensiveThing正确地异步(意味着它不同步执行任何IO或CPU工作),那么调用这两个方法然后等待结果的第二个解决方案应该已经成功了。您也可以使用Task.WhenAll

然而,如果不是这样,您可以通过将每个任务发布到线程池并使用Task.WhenAll组合子来获得更好的结果,例如:

public Task<IList<Thing>> GetThings() =>
    Task.WhenAll(Task.Run(() => GetExpensiveThing()), Task.Run(() => GetExpensiveThing()));

(注意,我将返回类型更改为IList,以完全避免await s。)

您应该避免使用Result属性。与使用continuation的awaitTask.WhenAll不同,它会导致调用线程阻塞并等待任务完成。