C# 并行 Foreach + 异步

本文关键字:异步 Foreach 并行 | 更新日期: 2023-09-27 17:56:28

我正在处理项目列表(200k - 300k),每个项目处理时间在 2 到 8 秒之间。为了节省时间,我可以并行处理此列表。由于我在异步上下文中,因此我使用如下内容:

public async Task<List<Keyword>> DoWord(List<string> keyword)
{
    ConcurrentBag<Keyword> keywordResults = new ConcurrentBag<Keyword>();
    if (keyword.Count > 0)
    {
        try
        {
            var tasks = keyword.Select(async kw =>
            {
                return await Work(kw).ConfigureAwait(false);
            });
            keywordResults = new ConcurrentBag<Keyword>(await Task.WhenAll(tasks).ConfigureAwait(false));
        }
        catch (AggregateException ae)
        {
            foreach (Exception innerEx in ae.InnerExceptions)
            {
                log.ErrorFormat("Core threads exception: {0}", innerEx);
            }
        }
    }
    return keywordResults.ToList();
}

关键字列表始终包含 8 个元素(来自上面),因此我处理我的列表 8 x 8,但在这种情况下,我想如果 7 个关键字在 3 秒内处理,第 8 个关键字在 10 秒内处理,则 8 个关键字的总时间将是 10(如果我错了,请纠正我)。那我该如何从Parallel.Foreach接近呢?我的意思是:如果完成 8 个关键字,则启动 1 个关键字,再启动 1 个。在这种情况下,我将永久拥有 8 个工作流程。知道吗?

C# 并行 Foreach + 异步

另一种更简单的方法是使用 AsyncEnumerator NuGet 包:

using System.Collections.Async;
public async Task<List<Keyword>> DoWord(List<string> keywords)
{
    var keywordResults = new ConcurrentBag<Keyword>();
    await keywords.ParallelForEachAsync(async keyword =>
    {
        try
        {
            var result = await Work(keyword);
            keywordResults.Add(result);
        }
        catch (AggregateException ae)
        {
            foreach (Exception innerEx in ae.InnerExceptions)
            {
                log.ErrorFormat("Core threads exception: {0}", innerEx);
            }
        }
    }, maxDegreeOfParallelism: 8);
    return keywordResults.ToList();
}
下面是

一些示例代码,显示了如何使用TPL Dataflow来解决这个问题。

请注意,为了编译它,你需要通过 NuGet 将 TPL 数据流添加到项目中。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Demo
{
    class Keyword // Dummy test class.
    {
        public string Name;
    }
    class Program
    {
        static void Main()
        {
            // Dummy test data.
            var keywords = Enumerable.Range(1, 100).Select(n => n.ToString()).ToList();
            var result = DoWork(keywords).Result;
            Console.WriteLine("---------------------------------");
            foreach (var item in result)
                Console.WriteLine(item.Name);
        }
        public static async Task<List<Keyword>> DoWork(List<string> keywords)
        {
            var input = new TransformBlock<string, Keyword>
            (
                async s => await Work(s),
                // This is where you specify the max number of threads to use.
                new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 }
            );
            var result = new List<Keyword>();
            var output = new ActionBlock<Keyword>
            (
                item => result.Add(item),   // Output only 1 item at a time, because 'result.Add()' is not threadsafe.
                new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }
            );
            input.LinkTo(output, new DataflowLinkOptions { PropagateCompletion = true });
            foreach (string s in keywords)
                await input.SendAsync(s);
            input.Complete();
            await output.Completion;
            return result;
        }
        public static async Task<Keyword> Work(string s) // Stubbed test method.
        {
            Console.WriteLine("Processing " + s);
            int delay;
            lock (rng) { delay = rng.Next(10, 1000); }
            await Task.Delay(delay); // Simulate load.
            Console.WriteLine("Completed " + s);
            return await Task.Run( () => new Keyword { Name = s });
        }
        static Random rng = new Random();
    }
}