TPL 数据流块在传播完成时永远不会完成

本文关键字:永远 完成时 数据流 传播 TPL | 更新日期: 2023-09-27 17:57:19

自从上次更改我的传播完成管道以来,我的一个缓冲区块从未完成。让我总结一下哪些有效,哪些不再有效:

以前工作:

A.LinkTo(B, PropagateCompletion);
B.LinkTo(C, PropagateCompletion);
C.LinkTo(D, PropagateCompletion);
D.Receive();
// everything completes

不再工作:

A.LinkTo(B, PropagateCompletion);
C.LinkTo(D, PropagateCompletion);
await A.Completion;
someWriteOnceBlock.Post(B.Count);
// B.Complete(); commented on purpose
B.LinkTo(C, PropagateCompletion);
D.Receive();
// Only A reaches completion
// B remains in 'waiting for activation'
// C executes but obviously never completes since B doesn't either

如果我取消注释注释的行,一切正常,但显然该行不是必需的。

不知何故,我的 BufferBlock B 永远不会完成,即使链接到它的块已完成并传播其完成,并且从它链接的块接收所有缓冲项。

TPL 数据流块在传播完成时永远不会完成

通过等待 A 的完成,在 A 完成之前,不会执行任何剩余的代码。这就是 await 的工作方式 - 代码包装在延续中后,准备完成等待的代码。因此,在这种情况下,B 在 A 完成后链接到 A,因此我认为不会传播完成。

我遇到了同样的问题 - 我的最后一个块是一个变形块,它只是坐在那里等待完成。

仅当一个块没有剩余的输入要处理,它的所有输出都已处理并且已完成时,它才会完成,或者通过调用块。完成或完成沿链向下传播。

我通过将最后一个块设置为操作块来解决此问题,因为这不会产生任何输出。这是我的代码,它采用一系列文本文件并反序列化其内容:

Dim maxDop = Environment.ProcessorCount
Dim executionOptions = New ExecutionDataflowBlockOptions With {.MaxDegreeOfParallelism = maxDop}
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
Dim inputBlock = New BufferBlock(Of String)
Dim transformBlock = New TransformBlock(Of String, IEnumerable(of JsonObject))(Function(fileName) DeserialiseFromFileAsync(fileName)
Dim outputBlock As New BufferBlock(Of IEnumerable(of JsonObject))
Dim combineBlock = New ActionBlock(Of IEnumerable(of JsonObject))(Sub(col) ' Do something to add all the collections together)
inputBlock.LinkTo(transformBlock, linkOptions)
transformBlock.LinkTo(outputBlock, linkOptions)
outputBlock.LinkTo(combineBlock, linkOptions)
For fileNo = 1 To 10
    inputBlock.Post(String.Concat("JsonFile", fileNo, ".txt"))
Next
inputBlock.Complete() 'Complete the first block, propogation will handle the rest
Await combineBlock.Completion 'Await the last block completing