C# 并行 Foreach + 异步
本文关键字:异步 Foreach 并行 | 更新日期: 2023-09-27 17:56:28
我正在处理项目列表(200k - 300k),每个项目处理时间在 2 到 8 秒之间。为了节省时间,我可以并行处理此列表。由于我在异步上下文中,因此我使用如下内容:
public async Task<List<Keyword>> DoWord(List<string> keyword)
{
ConcurrentBag<Keyword> keywordResults = new ConcurrentBag<Keyword>();
if (keyword.Count > 0)
{
try
{
var tasks = keyword.Select(async kw =>
{
return await Work(kw).ConfigureAwait(false);
});
keywordResults = new ConcurrentBag<Keyword>(await Task.WhenAll(tasks).ConfigureAwait(false));
}
catch (AggregateException ae)
{
foreach (Exception innerEx in ae.InnerExceptions)
{
log.ErrorFormat("Core threads exception: {0}", innerEx);
}
}
}
return keywordResults.ToList();
}
关键字列表始终包含 8 个元素(来自上面),因此我处理我的列表 8 x 8,但在这种情况下,我想如果 7 个关键字在 3 秒内处理,第 8 个关键字在 10 秒内处理,则 8 个关键字的总时间将是 10(如果我错了,请纠正我)。那我该如何从Parallel.Foreach
接近呢?我的意思是:如果完成 8 个关键字,则启动 1 个关键字,再启动 1 个。在这种情况下,我将永久拥有 8 个工作流程。知道吗?
另一种更简单的方法是使用 AsyncEnumerator NuGet 包:
using System.Collections.Async;
public async Task<List<Keyword>> DoWord(List<string> keywords)
{
var keywordResults = new ConcurrentBag<Keyword>();
await keywords.ParallelForEachAsync(async keyword =>
{
try
{
var result = await Work(keyword);
keywordResults.Add(result);
}
catch (AggregateException ae)
{
foreach (Exception innerEx in ae.InnerExceptions)
{
log.ErrorFormat("Core threads exception: {0}", innerEx);
}
}
}, maxDegreeOfParallelism: 8);
return keywordResults.ToList();
}
下面是
一些示例代码,显示了如何使用TPL Dataflow
来解决这个问题。
请注意,为了编译它,你需要通过 NuGet 将 TPL 数据流添加到项目中。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Demo
{
class Keyword // Dummy test class.
{
public string Name;
}
class Program
{
static void Main()
{
// Dummy test data.
var keywords = Enumerable.Range(1, 100).Select(n => n.ToString()).ToList();
var result = DoWork(keywords).Result;
Console.WriteLine("---------------------------------");
foreach (var item in result)
Console.WriteLine(item.Name);
}
public static async Task<List<Keyword>> DoWork(List<string> keywords)
{
var input = new TransformBlock<string, Keyword>
(
async s => await Work(s),
// This is where you specify the max number of threads to use.
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 }
);
var result = new List<Keyword>();
var output = new ActionBlock<Keyword>
(
item => result.Add(item), // Output only 1 item at a time, because 'result.Add()' is not threadsafe.
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }
);
input.LinkTo(output, new DataflowLinkOptions { PropagateCompletion = true });
foreach (string s in keywords)
await input.SendAsync(s);
input.Complete();
await output.Completion;
return result;
}
public static async Task<Keyword> Work(string s) // Stubbed test method.
{
Console.WriteLine("Processing " + s);
int delay;
lock (rng) { delay = rng.Next(10, 1000); }
await Task.Delay(delay); // Simulate load.
Console.WriteLine("Completed " + s);
return await Task.Run( () => new Keyword { Name = s });
}
static Random rng = new Random();
}
}