理解TPL数据流的并行度排序
本文关键字:并行度 排序 数据流 TPL 理解 | 更新日期: 2023-09-27 18:04:17
我正在阅读数据流(任务并行库),其中有一部分说:
当您指定的最大并行度大于1时,将同时处理多个消息,因此,消息可能不会按照接收到的顺序进行处理。然而,从块中输出消息的顺序将被正确排序。
这是什么意思?
示例,我将动作块设置为并行度= 5:
testActionBlock = new ActionBlock<int>(i => Consumer(i),
new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 5
});
await Producer();
testActionBlock.Completion.Wait();
My Producer()基本上将数字排队到块中:
private async Task Producer()
{
for (int i=0; i<= 1000; i++)
{
await testActionBlock.SendAsync(i);
}
testActionBlock.Complete();
}
和我的消费者(i)只是写出以下行:
private async Task Consumer(int i)
{
if (i == 1)
{
await Task.Delay(5000);
}
Console.WriteLine(i);
}
这是否意味着消费者(2)将被阻塞,直到消费者(1)完成处理(因为有5秒延迟)?我测试了代码,似乎并不是这样。即使我删除了5秒的延迟,我也没有看到输出是有序的。
(更新)bBlock = new BufferBlock<int>(option);
testActionBlock = new ActionBlock<int>(i => Consumer(i),
new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 5
});
bBlock.LinkTo(testActionBlock);
await Producer();
testActionBlock.Completion.Wait();
My Producer()现在将添加到block:
private async Task Producer()
{
for (int i=0; i<= 1000; i++)
{
await bBlock.SendAsync(i);
}
bBlock.Complete();
}
那么,在这种情况下,消费者(1)将等待5秒,然后消费者(2)才能继续?
No。你可以把DoP想象成线程(不完全是,但很容易理解)
所以在5时,它会尝试一次处理5个。因为1号花了5秒,2号肯定会先完成。3、4、5条很可能也是如此。甚至可能是#6(因为#2已经完成了,DoP将允许它从#6开始)
即使没有延迟,也不能保证处理的顺序。所以永远不要依赖于的顺序,他们执行。话虽如此,当您使用消息输出(不是打印,因为这是它们执行的顺序)时,它们将按照它们进入的顺序重新排序,即使它们以任意顺序执行。
DataflowBlockOptions
类包含一个可配置的属性EnsureOrdered
:
获取或设置一个值,该值指示是否应该在块对消息的处理中强制执行有序处理。
此属性确定块是否将以接收到的相同顺序输出处理过的消息,默认情况下是true
。因此,像TransformBlock
和TransformManyBlock
这样产生输出的块(实现ISourceBlock<TOutput>
接口)在将接收到的消息传播到它们的目标块(它们链接到的ITargetBlock<TInput>
块)时,保留了它们的原始顺序。
EnsureOrdered
选项与处理消息的顺序无关。例如,将属性MaxDegreeOfParallelism
设置为值DataflowBlockOptions.Unbounded
意味着所有接收到的消息将在到达时立即被调度执行,并且-如果ThreadPool
有足够的线程可用-所有这些消息将立即开始执行。将EnsureOrdered
设置为false
将产生这样的效果:一旦消息的执行完成,它将有资格向下游传播,即使先前收到的消息的执行尚未完成。
EnsureOrdered
选项对ActionBlock
s没有影响,因为这些块不产生输出。它对BufferBlock
s也没有影响,因为尽管这些块产生输出,但它们不做任何处理,所以不会发生任何可能扭曲它们接收到的消息的原始顺序的事情。简而言之,此属性仅对产生输出并执行处理的块有效。