C# 任务线程池 - 仅跨 10 个线程运行 100 个任务

本文关键字:线程 任务 运行 仅跨 | 更新日期: 2023-09-27 18:37:00

我只是想知道是否有人可以指出有关异步/等待框架和线程池的正确方向?

基本上,我要做的是在单独的线程/异步中执行 x 个操作,但最多跨 y 个线程。

例如,假设我有 100 个数据库操作: await _repository.WriteData(someData);

我想做的是有一些方法一次运行 10 个这样的操作(理想情况下,每个操作在单独的线程中,所以 10 个线程),当每个操作完成时,下一个在线程上启动,然后变得可用。然后,我们等待所有操作完成,所有线程完成...

这是无需太多努力或增加大量复杂性即可轻松实现的吗?

C# 任务线程池 - 仅跨 10 个线程运行 100 个任务

我认为您关注线程没有抓住重点,尤其是对于不需要线程执行的异步操作。

.NET 有一个很好的ThreadPool你可以使用。你不知道里面有多少线程,你不在乎。它只是工作(直到它没有并且您需要自己配置它,但这是非常先进的)。

ThreadPool上运行任务非常简单。为每个操作创建一个任务并使用SemaphoreSlim限制它们,或使用现成的 TPL 数据流块。例如:

var block = new ActionBlock<SomeData>(
    _ => _repository.WriteDataAsync(_), // What to do on each item
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 }); // How many items at the same time
foreach (var item in items)
{
    block.Post(item); // Post all items to the block
}
block.Complete(); // Signal completion
await block.Completion; // Asynchronously wait for completion.

但是,如果您确实计划创建"专用"线程,则可以将Task.Factory.StartNewLongRunning选项一起使用,该选项可在ThreadPool之外创建专用线程。但请记住,异步操作不会在整个操作过程中保持相同的线程,因为异步操作不需要线程。因此,从专用线程开始可能毫无意义(在我的博客上有更多内容:LongRun 对于 Task.Run with async-await 毫无用处)

@i3arnon的答案是正确的。使用 TPL 数据流。

本答案的其余部分仅用于教育目的和/或特殊用例。

最近在一个项目中遇到了类似的问题,我无法引入任何外部依赖项,所以我不得不推出自己的负载平衡实现,结果证明它非常简单(直到您开始连接取消和排序结果 - 但这超出了这个问题的范围)。

我忽略了"10 个专用线程"的要求,因为正如其他人已经解释的那样,它在处理异步操作时没有意义。相反,我将维护最多 N 个处理工作负载的并发Task实例。

static async Task InvokeAsync(IEnumerable<Func<Task>> taskFactories, int maxDegreeOfParallelism)
{
    Queue<Func<Task>> queue = new Queue<Func<Task>>(taskFactories);
    if (queue.Count == 0) {
        return;
    }
    List<Task> tasksInFlight = new List<Task>(maxDegreeOfParallelism);
    do
    {
        while (tasksInFlight.Count < maxDegreeOfParallelism && queue.Count != 0)
        {
            Func<Task> taskFactory = queue.Dequeue();
            tasksInFlight.Add(taskFactory());
        }
        Task completedTask = await Task.WhenAny(tasksInFlight).ConfigureAwait(false);
        // Propagate exceptions. In-flight tasks will be abandoned if this throws.
        await completedTask.ConfigureAwait(false);
        tasksInFlight.Remove(completedTask);
    }
    while (queue.Count != 0 || tasksInFlight.Count != 0);
}

用法:

Func<Task>[] taskFactories = {
    () => _repository.WriteData(someData1),
    () => _repository.WriteData(someData2),
    () => _repository.WriteData(someData3),
    () => _repository.WriteData(someData4)
};
await InvokeAsync(taskFactories, maxDegreeOfParallelism: 2);

。或

IEnumerable<SomeData> someDataCollection = ... // Get data.
await ParallelTasks.InvokeAsync(
    someDataCollection.Select(someData => new Func<Task>(() => _repository.WriteData(someData))),
    maxDegreeOfParallelism: 10
);

解决方案不会受到负载均衡不良问题的影响,这在其他琐碎的实现中很常见,在任务具有不同持续时间且输入已预先分区的情况下(例如此)。

具有性能优化和参数验证的版本:要点。