
本文关键字:异步 任务 运行 何正确 并行 | 更新日期: 2023-09-27 18:35:34

如果您需要并行运行多个异步 I/O 任务,但需要确保同时运行的 I/O 进程不超过 X 个,并且 I/O 处理前和后处理任务不应该有这样的限制,该怎么办?

下面是一个场景 - 假设有 1000 个任务;每个任务都接受文本字符串作为输入参数;转换该文本(预 I/O 处理),然后将转换后的文本写入文件。目标是使预处理逻辑利用 100% 的 CPU/内核和 I/O 部分的任务,以最大 10 的并行度运行(一次最多同时打开 10 个以写入文件)。

你能提供一个如何使用C#/.NET 4.5的示例代码吗?



我认为为此使用 TPL 数据流将是一个好主意:您创建具有无限并行性的预处理和后处理块,具有有限并行性的文件写入块并将它们链接在一起。像这样:

var unboundedParallelismOptions =
    new ExecutionDataflowBlockOptions
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
var preProcessBlock = new TransformBlock<string, string>(
    s => PreProcess(s), unboundedParallelismOptions);
var writeToFileBlock = new TransformBlock<string, string>(
    async s =>
                await WriteToFile(s);
                return s;
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });
var postProcessBlock = new ActionBlock<string>(
    s => PostProcess(s), unboundedParallelismOptions);
var propagateCompletionOptions =
    new DataflowLinkOptions { PropagateCompletion = true };
preProcessBlock.LinkTo(writeToFileBlock, propagateCompletionOptions);
writeToFileBlock.LinkTo(postProcessBlock, propagateCompletionOptions);
// use something like await preProcessBlock.SendAsync("text") here
await postProcessBlock.Completion;


private static async Task WriteToFile(string s)
    using (var writer = new StreamWriter(GetFileName()))
        await writer.WriteAsync(s);




    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
        if (maxDegreeOfParallelism.HasValue)
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
                var tasksWithThrottler = new List<Task>();
                foreach (var item in enumerable)
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();
                    tasksWithThrottler.Add(Task.Run(async () =>
                        await action(item).ContinueWith(res =>
                            // action is completed, so decrement the number of currently running tasks
                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            await Task.WhenAll(enumerable.Select(item => action(item)));


await enumerable.ForEachAsyncConcurrent(
    async item =>
        await SomeAsyncMethod(item);