使用并行.ForEach方法返回task -避免task . waitall

本文关键字:task 避免 waitall 返回 方法 并行 ForEach | 更新日期: 2023-09-27 18:10:00

我有一个方法,它采用IWorkItem,开始工作并返回相关任务。由于使用了外部库,该方法必须看起来像这样。

public Task WorkOn(IWorkItem workItem)
{
    //...start asynchronous operation, return task
}

我想在多个工作项上做这项工作。我不知道有多少人会去,也许有1个,也许1万。WorkOn方法具有内部池,如果达到太多并行执行,可能需要等待。(如SemaphoreSlim.Wait):

public Task WorkOn(IWorkItem workItem)
{
    _semaphoreSlim.Wait();
}

我当前的解决方案是:

public void Do(params IWorkItem[] workItems)
{
    var tasks = new Task[workItems.Length];
    for (var i = 0; i < workItems.Length; i++)
    {
        tasks[i] = WorkOn(workItems[i]);
    }
    Task.WaitAll(tasks);
}

问题:我可以用some Parallel吗?在这种情况下,每个人?为了避免创建10000个任务,然后由于WorkOn的节流而等待?

使用并行.ForEach方法返回task -避免task . waitall

其实没那么容易。您可以使用Parallel.ForEach来限制生成的任务数量。但我不确定它在你的情况下会有什么表现。

作为一般的经验法则,我通常尽量避免将TaskParallel混合使用。

你当然可以这样做:

public void Do(params IWorkItem[] workItems)
{
    Parallel.ForEach(workItems, (workItem) => WorkOn(workItem).Wait());
}

在"正常"条件下,这应该可以很好地限制并发性。

您也可以完全使用async - await,并使用一些技巧对您的并发性进行一些限制。但是在这种情况下,你必须自己限制并发性。

const int ConcurrencyLimit = 8;
public async Task Do(params IWorkItem[] workItems)
{
    var cursor = 0;
    var currentlyProcessing = new List<Task>(ConcurrencyLimit);
    while (cursor < workItems.Length)
    {
        while (currentlyProcessing.Count < ConcurrencyLimit && cursor < workItems.Length)
        {
            currentlyProcessing.Add(WorkOn(workItems[cursor]));
            cursor++;
        }
        Task finished = await Task.WhenAny(currentlyProcessing);
        currentlyProcessing.Remove(finished);
    }
    await Task.WhenAll(currentlyProcessing);
}

正如我所说的…要复杂得多。但它也会将并发性限制为您所应用的任何值。此外,它正确地使用了async - await模式。如果你不想要非阻塞多线程,你可以很容易地将这个函数包装到另一个函数中,并对这个函数返回的任务执行阻塞.Wait

这个实现的关键是Task.WhenAny函数。这个函数将在应用的任务列表中返回一个已完成的任务(由await的另一个任务包装)。