异步处理 IEnumerable,并发性有限
本文关键字:并发 处理 IEnumerable Task 异步 | 更新日期: 2023-09-27 17:56:22
我有一个IEnumerable<Task<T>>
,其中T
表示某个事件(事件的自然语言类型,而不是event
类型的事件)。
我想异步处理这些,因为它们是 IO 绑定的,并限制并发量,因为处理事件的数据库不能处理超过一小撮(比如 6 个)并发处理请求(它们非常重)这样做的正确策略是什么?
如果我有
private Task processeventasync(T someevent) {
...
}
foreach(t in tasks) {
await processeventsasync(await t)
}
我没有并发性。
如果我用信号量保护东西,我实际上是在保护线程并用锁保护它们,而不是异步等待它们。
https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler(v=vs.110).aspx示例中的LimitedConcurrencyLevelTaskScheduler
也是一种基于线程/锁的方法
我考虑过维护最多 6 个任务的队列,并围绕它进行WhenAny
循环,但这感觉就像重新发明方形轮子。
private List<Task> running = new List<Task>();
foreach(Task<T> task in tasks) {
var inner = TaskExtensions.Unwrap(t.ContinueWith(tt => processeventasync(tt.Result)));
running.Add(inner);
if (running.Count >= 6) {
var resulttask = await Task.WhenAny(running);
running.Remove(resulttask);
await resulttask;
//not sure if this await will schedule the next iteration
//of the loop asynchronously, or if the loop happily continues
//and the continuation has the rest of the loop body (nothing
}
}
去这里的正确方法是什么?
编辑:
SemaphoreSlim
WaitAsync
似乎非常合理。我来到以下看起来很奇怪的代码:
private async void Foo()
{
IEnumerable<Task<int>> tasks = gettasks();
var resulttasks = tasks.Select(ti => TaskExtensions.Unwrap(ti.ContinueWith(tt => processeventasync(tt.Result))));
var semaphore = new SemaphoreSlim(initialCount: 6);
foreach (Task task in resulttasks)
{
await semaphore.WaitAsync();
semaphore.Release();
}
}
在这里async void
在这里很臭,但它是一个无限循环;它永远不会返回(实际处理显然会有一些取消机制)。
光是体内的等待/释放看起来真的很奇怪,但看起来这实际上是对的。这是一种没有隐藏陷阱的合理方法吗?
SemaphoreSlim.WaitAsync
来限制并发性。
看起来真的很奇怪,只是体内的等待/释放,但是 看起来这实际上是对的
您目前的方法实际上没有任何作用。这些任务根本不受SemaphoreSlim
的影响,因为您可以使用 Enumerable.Select
并发调用它们。
您需要监控Select
内的信号量:
private const int ConcurrencyLimit = 6;
SemaphoreSlim semaphoreSlim = new SemaphoreSlim(ConcurrencyLimit);
public async Task FooAsync()
{
var tasks = GetTasks();
var sentTasks = tasks.Select(async task =>
{
await semaphoreSlim.WaitAsync();
try
{
await ProcessEventAsync(await task);
}
finally
{
semaphoreSlim.Release();
}
});
await Task.WhenAll(sentTasks);
}
private Task ProcessEventAsync(T someEvent)
{
// Process event.
}
TPL Dataflow的ActionBlock<T>
。
定义处理事件的操作块,然后将要处理的项目发布到此块。您还可以设置最大并行度。
var block = new ActionBlock<string>(str =>
{
//save in db
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 6
});
var sendings = new List<Task<bool>>
{
block.SendAsync("a"),
block.SendAsync("b"),
block.SendAsync("c")
};
await Task.WhenAll(sendings);
block.Complete(); // tell the block we're done sending messages
await block.Completion; // wait for messages to be processed