使用异步lambda并行foreach
本文关键字:并行 foreach lambda 异步 | 更新日期: 2023-09-27 18:01:38
我想并行处理一个集合,但是我在实现它时遇到了麻烦,因此我希望得到一些帮助。
如果我想调用c#中标记为async的方法,在并行循环的lambda中,就会出现问题。例如:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}
var count = bag.Count;
计数为0时出现问题,因为所有创建的线程实际上只是后台线程,Parallel.ForEach
调用不等待完成。如果我删除async关键字,方法看起来像这样:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
// some pre stuff
var responseTask = await GetData(item);
responseTask.Wait();
var response = responseTask.Result;
bag.Add(response);
// some post stuff
}
var count = bag.Count;
它可以工作,但它完全禁用了await的聪明,我必须做一些手动异常处理。(删除为简洁).
我如何实现一个Parallel.ForEach
循环,在lambda中使用await关键字?这可能吗?
并行的原型。ForEach方法以Action<T>
作为参数,但我希望它等待我的异步lambda。
如果你只想要简单的并行,你可以这样做:
var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;
如果您需要更复杂的内容,请查看Stephen Toub的ForEachAsync
帖子。
您可以使用AsyncEnumerator NuGet Package中的ParallelForEachAsync
扩展方法:
using Dasync.Collections;
var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;
免责声明:我是AsyncEnumerator库的作者,它是开源的,并在麻省理工学院的许可下,我发布这条消息只是为了帮助社区。
新的。net 6 api之一是Parallel。ForEachAsync,一种调度异步工作的方法,允许你控制并行度:
var urls = new []
{
"https://dotnet.microsoft.com",
"https://www.microsoft.com",
"https://stackoverflow.com"
};
var client = new HttpClient();
var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);
var response = await client.GetAsync(url);
if (response.IsSuccessStatusCode)
{
using var target = File.OpenWrite(targetPath);
await response.Content.CopyToAsync(target);
}
});
Scott Hanselman博客中的另一个例子。
源代码,供参考。
使用SemaphoreSlim
可以实现并行控制。
var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
await throttler.WaitAsync();
try
{
var response = await GetData(item);
bag.Add(response);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
var count = bag.Count;
从其他答案和被接受的答案所引用的文章中编译的最简单的可能扩展方法:
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
var tasks = source.Select(async item =>
{
await throttler.WaitAsync();
try
{
await asyncAction(item).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
}
更新:这是一个简单的修改,也支持取消令牌,就像在评论中请求的(未经测试)
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
var tasks = source.Select(async item =>
{
await throttler.WaitAsync(cancellationToken);
if (cancellationToken.IsCancellationRequested) return;
try
{
await asyncAction(item, cancellationToken).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
}
我的轻量级ParallelForEach async实现。
特点:
- 节流(最大并行度)。
- 异常处理(完成时会抛出聚合异常)。
- 内存高效(不需要存储任务列表)。
public static class AsyncEx
{
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
{
var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
var tcs = new TaskCompletionSource<object>();
var exceptions = new ConcurrentBag<Exception>();
bool addingCompleted = false;
foreach (T item in source)
{
await semaphoreSlim.WaitAsync();
asyncAction(item).ContinueWith(t =>
{
semaphoreSlim.Release();
if (t.Exception != null)
{
exceptions.Add(t.Exception);
}
if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
{
tcs.TrySetResult(null);
}
});
}
Volatile.Write(ref addingCompleted, true);
await tcs.Task;
if (exceptions.Count > 0)
{
throw new AggregateException(exceptions);
}
}
}
使用例子:
await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
var data = await GetData(i);
}, maxDegreeOfParallelism: 100);
我为此创建了一个扩展方法,该方法利用SemaphoreSlim并允许设置最大并行度
/// <summary>
/// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
/// Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
this IEnumerable<T> enumerable,
Func<T, Task> action,
int? maxDegreeOfParallelism = null)
{
if (maxDegreeOfParallelism.HasValue)
{
using (var semaphoreSlim = new SemaphoreSlim(
maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
{
var tasksWithThrottler = new List<Task>();
foreach (var item in enumerable)
{
// Increment the number of currently running tasks and wait if they are more than limit.
await semaphoreSlim.WaitAsync();
tasksWithThrottler.Add(Task.Run(async () =>
{
await action(item).ContinueWith(res =>
{
// action is completed, so decrement the number of currently running tasks
semaphoreSlim.Release();
});
}));
}
// Wait for all tasks to complete.
await Task.WhenAll(tasksWithThrottler.ToArray());
}
}
else
{
await Task.WhenAll(enumerable.Select(item => action(item)));
}
}
示例用法:await enumerable.ForEachAsyncConcurrent(
async item =>
{
await SomeAsyncMethod(item);
},
5);
在接受的回答中不需要ConcurrentBag。下面是一个没有它的实现:
var tasks = myCollection.Select(GetData).ToList();
await Task.WhenAll(tasks);
var results = tasks.Select(t => t.Result);
任何"//一些预先的东西"还有&;//一些帖子&;可以进入GetData实现(或调用GetData的其他方法)
除了更短,没有使用"async void"Lambda,这是一个反模式
下面设置为使用IAsyncEnumerable
,但可以修改为使用IEnumerable
,只需更改类型并删除"await"在foreach
。比起创建无数并行任务,然后等待它们全部完成,这更适合于大型数据集。
public static async Task ForEachAsyncConcurrent<T>(this IAsyncEnumerable<T> enumerable, Func<T, Task> action, int maxDegreeOfParallelism, int? boundedCapacity = null)
{
ActionBlock<T> block = new ActionBlock<T>(
action,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism,
BoundedCapacity = boundedCapacity ?? maxDegreeOfParallelism * 3
});
await foreach (T item in enumerable)
{
await block.SendAsync(item).ConfigureAwait(false);
}
block.Complete();
await block.Completion;
}
对于更简单的解决方案(不确定是否最优),您可以简单地将Parallel.ForEach
嵌套在Task
中-如
var options = new ParallelOptions { MaxDegreeOfParallelism = 5 }
Task.Run(() =>
{
Parallel.ForEach(myCollection, options, item =>
{
DoWork(item);
}
}
ParallelOptions
将为您做节流,开箱使用。
我在一个现实世界的场景中使用它在后台运行一个很长的操作。这些操作是通过HTTP调用的,并且它被设计成在长操作运行时不会阻塞HTTP调用。
- 为长时间后台操作调用HTTP。
- 操作在后台启动。
- 用户获取状态ID,该ID可用于使用另一个HTTP调用检查状态。
- 后台操作更新其状态。
这样,CI/CD调用不会因为长时间的HTTP操作而超时,而是每隔x秒循环一次状态,而不会阻塞进程