理解TPL数据流的并行度排序

本文关键字:并行度 排序 数据流 TPL 理解 | 更新日期: 2023-09-27 18:04:17

我正在阅读数据流(任务并行库),其中有一部分说:

当您指定的最大并行度大于1时,将同时处理多个消息,因此,消息可能不会按照接收到的顺序进行处理。然而,从块中输出消息的顺序将被正确排序。

这是什么意思?

示例,我将动作块设置为并行度= 5:

testActionBlock = new ActionBlock<int>(i => Consumer(i),
            new ExecutionDataflowBlockOptions()
            {
                MaxDegreeOfParallelism = 5
            });
await Producer();
testActionBlock.Completion.Wait();

My Producer()基本上将数字排队到块中:

private async Task Producer()
{
    for (int i=0; i<= 1000; i++)
    {
        await testActionBlock.SendAsync(i);
    }
    testActionBlock.Complete();
}

和我的消费者(i)只是写出以下行:

private async Task Consumer(int i)
{
    if (i == 1)
    {
        await Task.Delay(5000);
    }
    Console.WriteLine(i);
}

这是否意味着消费者(2)将被阻塞,直到消费者(1)完成处理(因为有5秒延迟)?我测试了代码,似乎并不是这样。即使我删除了5秒的延迟,我也没有看到输出是有序的。

(更新)

bBlock = new BufferBlock<int>(option);
testActionBlock = new ActionBlock<int>(i => Consumer(i),
    new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 5
    });
bBlock.LinkTo(testActionBlock);
await Producer();
testActionBlock.Completion.Wait();

My Producer()现在将添加到block:

private async Task Producer()
{
    for (int i=0; i<= 1000; i++)
    {
        await bBlock.SendAsync(i);
    }
    bBlock.Complete();
}

那么,在这种情况下,消费者(1)将等待5秒,然后消费者(2)才能继续?

理解TPL数据流的并行度排序

No。你可以把DoP想象成线程(不完全是,但很容易理解)

所以在5时,它会尝试一次处理5个。因为1号花了5秒,2号肯定会先完成。3、4、5条很可能也是如此。甚至可能是#6(因为#2已经完成了,DoP将允许它从#6开始)

即使没有延迟,也不能保证处理的顺序。所以永远不要依赖于的顺序,他们执行。话虽如此,当您使用消息输出(不是打印,因为这是它们执行的顺序)时,它们将按照它们进入的顺序重新排序,即使它们以任意顺序执行。

DataflowBlockOptions类包含一个可配置的属性EnsureOrdered:

获取或设置一个值,该值指示是否应该在块对消息的处理中强制执行有序处理。

此属性确定块是否将以接收到的相同顺序输出处理过的消息,默认情况下是true。因此,像TransformBlockTransformManyBlock这样产生输出的块(实现ISourceBlock<TOutput>接口)在将接收到的消息传播到它们的目标块(它们链接到的ITargetBlock<TInput>块)时,保留了它们的原始顺序。

EnsureOrdered选项与处理消息的顺序无关。例如,将属性MaxDegreeOfParallelism设置为值DataflowBlockOptions.Unbounded意味着所有接收到的消息将在到达时立即被调度执行,并且-如果ThreadPool有足够的线程可用-所有这些消息将立即开始执行。将EnsureOrdered设置为false将产生这样的效果:一旦消息的执行完成,它将有资格向下游传播,即使先前收到的消息的执行尚未完成。

EnsureOrdered选项对ActionBlock s没有影响,因为这些块不产生输出。它对BufferBlock s也没有影响,因为尽管这些块产生输出,但它们不做任何处理,所以不会发生任何可能扭曲它们接收到的消息的原始顺序的事情。简而言之,此属性仅对产生输出并执行处理的块有效。