如何使用具有负载平衡和有限并行度的任务并行库
本文关键字:并行度 任务 并行 何使用 有负载 平衡 | 更新日期: 2023-09-27 18:24:13
我的任务是通过使用(async)接口向外部系统写入一个已知的nr值。我必须限制并发执行的并行写入的最大数量。此外,我必须使用负载平衡,因为外部系统可能需要更长的时间来编写一些值。
我知道如何单独解决这些问题:
并行度:
new ParallelOptions {MaxDegreeOfParallelism = maxNrParallelWrites}
我还偶然发现了这篇文章:http://msdn.microsoft.com/en-us/library/ee789351(v=vs.110).aspx
负载平衡:
var partitioner = Partitioner.Create(values.ToList(), true);
来自异步接口的任务:
var writeTask = Task<AccessResult>.Factory.FromAsync(BeginWriteValue, EndWriteValue, value.SystemId, value.Xml, priority, null);
但是我该如何正确地结合所有这些技术呢?我创建了以下代码:
int maxNrParallelWrites = GetMaxNrParallelWrites();
var partitioner = Partitioner.Create(values.ToList(), true);
Parallel.ForEach(partitioner, new ParallelOptions {MaxDegreeOfParallelism = maxNrParallelWrites},
(val) =>
{
var writeValueTask = GetWriteValueTask(val, priority);
Task.WaitAny(writeValueTask);
});
我特别不确定前一段代码的最后一部分:执行工作负载的操作。与其直接创建WriteValueTask,不如使用这样的同步接口:
(val) =>
{
var accessResult = externalSystem.WriteValue(....);
}
还是可以创建一个任务,然后直接等待它(task.WetAny(…))?
您应该使用TPL Dataflow的ActionBlock
来封装所有这些。这是一个基于参与者的框架,是TPL:的一部分
var block = new ActionBlock<Value>(
value => GetWriteValueTask(value, priority)
new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = GetMaxNrParallelWrites();
});
foreach (var value in values)
{
block.Post(value);
}
您可以设置MaxDegreeOfParallelism
、BoundedCapacity
,并烘焙负载平衡,因为它一次只处理MaxDegreeOfParallelism
个项目,当每个项目完成时,它处理下一个项目(而不是使用提前对集合进行分区的Partitioner
)
注意:当您执行async
任务并等待它同步完成(即Task.WaitAny
)时,实际上没有什么是异步的。在这种情况下,您应该使用Task.WhenAny
。
本文中有一个关于如何创建负载平衡ForEachASync
方法的好例子。。我去掉了Task.Run
以避免启动新线程,然后扩展方法变成了这样:
public static class Extensions
{
public static async Task ExecuteInPartition<T>(IEnumerator<T> partition, Func<T, Task> body)
{
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select ExecuteInPartition(partition, body));
}
}
用法
此示例一次异步处理最多100封电子邮件
// Process 100 emails at a time
return emailsToProcess.ForEachAsync(100, ProcessSingleEmail);