TPL 数据流异步调度
本文关键字:调度 异步 数据流 TPL | 更新日期: 2023-09-27 18:32:23
async
Task
s的调度在TPL数据流中没有像我预期的那样工作。在下面的示例中,我希望ActionBlock
尽快处理来自TransformBlock
的数据。但它正在等待第二个(延迟的)结果,然后再进入第三个结果。我在这里误解了什么?对处理顺序有要求吗?
public class TestDataFlow
{
public System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch();
public async Task Flow()
{
watch.Start();
var plus10 = new TransformBlock<int, int>(async input =>
{
if (input == 2)
{
await Task.Delay(5000);
}
Console.WriteLine("Exiting plus10 for input {0} @ {1}", input, watch.Elapsed);
return input + 10;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
});
var printSolution = new ActionBlock<int>(input =>
{
Console.WriteLine("Solution: {0} @ {1}", input, watch.Elapsed.TotalMilliSeconds);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
});
plus10.LinkTo(printSolution);
List<int> inputs = new List<int> { 1, 2, 3 };
foreach (var input in inputs)
{
await plus10.SendAsync(input);
}
}
}
输出:
Exiting plus10 for input 1 @ 115.8583
Exiting plus10 for input 3 @ 116.6973
Solution: 11 @ 126.0146
Exiting plus10 for input 2 @ 5124.4074
Solution: 12 @ 5124.9014
Solution: 13 @ 5126.4834
TPL 数据流保证输入和输出队列的顺序,无论并行处理多少项。
"由于每个预定义的源数据流块类型都保证消息按接收顺序传播出去,因此必须先从源块读取每条消息,然后源块才能处理下一条消息"
从数据流(任务并行库)
如果您希望项目在完成处理后移动到下一个块,您应该自己明确地传输它们,这会将您的TransformBlock
变成ActionBlock
:
var printSolution = new ActionBlock<int>(input =>
{
Console.WriteLine("Solution: {0} @ {1}", input, watch.Elapsed.TotalMilliSeconds);
},executionDataflowBlockOptions);
var plus10 = new ActionBlock<int>(async input =>
{
if (input == 2)
{
await Task.Delay(5000);
}
Console.WriteLine("Exiting plus10 for input {0} @ {1}", input, watch.Elapsed);
await printSolution.SendAsync(input + 10);
}, executionDataflowBlockOptions);
截至(至少)System.Threading.Tasks.Dataflow.4.6.0
,ExecutionDataflowBlockOptions
现在有一个属性EnsureOrdered
可以设置为 false
。
要更新:
Install-Package System.Threading.Tasks.Dataflow
法典:
var options = new ExecutionDataflowBlockOptions {
EnsureOrdered = false
};
var transform = new TransformBlock<int, int>(i => Transform(i), options);
更多示例:https://stackoverflow.com/a/38865414/625919
发展历史,我认为很整洁:https://github.com/dotnet/corefx/issues/536 https://github.com/dotnet/corefx/pull/5191