使用LinkTo谓词的TPL数据流块

本文关键字:数据流 TPL LinkTo 谓词 使用 | 更新日期: 2023-09-27 18:20:59

我有一些块最终从一个TransformBlock转到基于LinkTo谓词的其他三个转换块之一。我正在使用DataflowLinkOptions来传播完成。问题是,当一个谓词被满足并且该块被启动时,我的管道的其余部分将继续。看起来管道应该等待这个块首先完成。

这个的代码是这样的:

var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
mainBlock.LinkTo(block1, linkOptions, x => x.Status = Status.Complete);
mainBlock.LinkTo(block2, linkOptions, x => x.Status = Status.Cancelled);
mainBlock.LinkTo(block3, linkOptions, x => x.Status = Status.Delayed);
mainBlock.LinkTo(DataflowBlock.NullTarget<Thing>(), linkOptions);

现在,这并不像我所说的那样起作用,所以我找到的获得我想要的行为的唯一方法是取出linkOptions,并将以下内容添加到mainBlock的lambda中。

mainBlock = new TransformBlock<Thing,Thing>(input =>
{
    DoMyStuff(input);
    if (input.Status = Status.Complete)
    {
        mainBlock.Completion.ContinueWith(t => block1.Complete());
    }
    if (input.Status = Status.Cancelled)
    {
        mainBlock.Completion.ContinueWith(t => block2.Complete());
    }
    if (input.Status = Status.Delayed)
    {
        mainBlock.Completion.ContinueWith(t => block3.Complete());
    }
    return input;
});

那么问题是,这是实现这一目标的唯一途径吗?

顺便说一句,这已经在我的单元测试中运行了,单个数据项通过它来尝试调试管道行为。每个块都通过多个单元测试进行了单独测试。因此,在我的管道单元测试中,断言在块完成执行之前就被命中了,因此失败了。

如果我删除block2和block3链接,并使用linkOptions调试测试,它可以正常工作。

使用LinkTo谓词的TPL数据流块

问题不在于问题中的代码,它可以正确工作:当主块完成时,所有三个后续块也都标记为完成。

问题出在结束块上:您在那里也使用PropagateCompletion,这意味着当前三个块中的任何完成时,结束块都被标记为完成。你想要的是,当所有三个块都完成时,将其标记为完成,并且答案中的Task.WhenAll().ContinueWith()组合完成了这一点(尽管该片段的第一部分是不必要的,但这与PropagateCompletion完全相同)。

事实证明,链接选项传播(至少这是我的猜测)将传播不满足linkTo中谓词的块的完成。

是的,它总是传播完成。完成没有任何关联项,所以将谓词应用于它没有任何意义。也许你总是只有一个项(这并不常见)这一事实会让你更困惑?

如果我的猜测是正确的,我会觉得这是链接选项完成传播中的错误或设计错误。如果一个块从未使用过,为什么它应该是完整的?

为什么不应该呢?对我来说,这是完全合理的:即使这次没有Status.Delayed的项目,你仍然希望完成处理这些项目的块,这样任何后续代码都可以知道所有延迟的项目都已经被处理了。事实上根本没有。


无论如何,如果你经常遇到这种情况,你可能想创建一个助手方法,将几个源块同时链接到一个目标块,并正确地传播完成:

public static void LinkTo<T>(
    this IReadOnlyCollection<ISourceBlock<T>> sources, ITargetBlock<T> target,
    bool propagateCompletion)
{
    foreach (var source in sources)
    {
        source.LinkTo(target);
    }
    if (propagateCompletion)
        Task.WhenAll(sources.Select(source => source.Completion))
            .ContinueWith(_ => target.Complete());
}

用法:

new[] { block1, block2, block3 }.LinkTo(endBlock, propagateCompletion: true);

好的。所以我首先要感谢科里。当我第一次读到他的评论时,我有点恼火,因为我觉得我的代码很好地说明了这个概念,可以很容易地转换成工作版本。但无论如何,我觉得有必要做一个完整的可测试版本,因为他的评论,我可以发布。

在我的测试中,令人惊讶的是,尽管它模仿了我的真实代码,但我认为会失败的路径通过了,而我认为会通过的路径也失败了。这让我有点晕头转向。所以我开始对原始代码进行更多的排列。基本上,我创建了同步的块和异步的块,并制作了这两种管道。总共四个,2个同步和2个异步,其中一个使用链接选项进行传播,另一个使用MainBlock中的完成任务,如图所示。

在给异步任务添加了一些任务延迟后,我发现同步版本通过了测试,异步版本失败了。

因此,问题的最终解决方案并非上述任何一种。事实证明,链接选项传播(至少这是我的猜测)将传播不满足linkTo中谓词的块的完成。因此,当一个状态为"完成"的东西出现时,它会进入区块1。

哦,我应该指出,在完整的测试代码中,我制作了所有的块1,2&3连接到相同的EndBlock,这在原始代码中没有显示。

不管怎样,在谓词被满足并且Thing进入块1之后,我相信块2和块3被设置为完成。这导致我们在单元测试中等待的EndBlock完成,而Assert失败,因为Block1还没有完成它的工作。

如果我的猜测是正确的,我会觉得这是链接选项完成传播中的错误或设计错误。如果一个块从未使用过,为什么它应该是完整的?

下面是我为解决这个问题所做的。我去掉了链接选项,并手动连接了完成事件。像这样:

MainBlock.Completion.ContinueWith(t =>
{
Block1.Complete();
Block2.Complete();
Block3.Complete();
});
Task.WhenAll(Block1.Completion, Block2.Completion, Block3.Completion)
.ContinueWith(t =>
{
    EndBlock.Complete();
});

这工作得很好,当转移到我的真实代码时也很好。任务.WhenAll让我相信,未使用的块被设置为完成,以及为什么自动传播是个问题。

我希望这能帮助到别人。当我发布所有测试代码时,我会回来添加一个链接。

编辑:以下是测试代码要点的链接https://gist.github.com/jmichas/bfab9cec84f0d1e40e12