如何发出数据流完成的信号

本文关键字:信号 何发出 数据流 | 更新日期: 2023-09-27 18:20:26

我有一个类,它使用TPL数据流实现了由3个步骤组成的数据流。

在构造函数中,我将步骤创建为TransformBlocks,并使用LinkTo将它们链接起来,同时将DataflowLinkOptions.PropagateCompletion设置为true。该类公开了一个方法,该方法通过在第一步调用SendAsync来启动工作流。该方法返回工作流最后一步的"完成"属性。

目前,工作流中的步骤似乎按预期执行,但除非我明确调用Complete,否则最后一步永远不会完成。但这样做会使工作流短路,并且没有执行任何步骤?我做错了什么?

public class MessagePipeline {
   private TransformBlock<object, object> step1;
   private TransformBlock<object, object> step2;
   private TransformBlock<object, object> step3;
   public MessagePipeline() {
      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
      step1 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step1...");
        return x;
      });
      step2 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step2...");
        return x;
      });
      step3 = new TransformBlock<object, object>(
        x => {
        Console.WriteLine("Step3...");
        return x;
      });
      step1.LinkTo(step2, linkOptions);
      step2.LinkTo(step3, linkOptions);
   }
   public Task Push(object message) {
      step1.SendAsync(message);
      step1.Complete();
      return step3.Completion;
   }
}
...
public class Program {
  public static void Main(string[] args) {
    var pipeline = new MessagePipeline();
    var result = pipeline.Push("Hello, world!");
    result.ContinueWith(_ => Console.WriteLine("Completed"));
    Console.ReadLine();
  }
}

如何发出数据流完成的信号

当您链接步骤时,您需要传递一个将PropagateCompletion属性设置为true的DataflowLinkOptions,以传播完成和错误。一旦这样做,在第一个块上调用Complete()将把完成分配给下游块。

一旦块接收到完成事件,它就会完成处理,然后通知其链接的下游目标。

通过这种方式,您可以将所有数据发布到第一步并调用Complete()。只有当所有上游区块都已完成时,最终区块才会完成。

例如,

var linkOptions=new DataflowLinkOptions { PropagateCompletion = true};
myFirstBlock.LinkTo(mySecondBlock,linkOptions);
mySecondBlock.LinkTo(myFinalBlock,linkOptions);
foreach(var message in messages)
{
    myFirstBlock.Post(message);
}
myFirstBlock.Complete();
......
await myFinalBlock.Completion;

默认情况下,PropagateCompletion不是真的,因为在更复杂的场景中(例如非线性流或动态变化流),您不希望完成和错误自动传播。如果您希望在不终止整个流的情况下处理错误,您可能还希望避免自动完成。

早在TPL数据流处于测试版时,默认的true,但在RTM 上更改了这一点

更新

代码永远不会完成,因为最后一步是TransformBlock,没有链接的目标来接收其输出。这意味着,即使块接收到完成信号,它也没有完成所有工作,并且不能更改自己的完成状态。

将其更改为ActionBlock<object>可消除此问题。

您需要显式调用Complete。