使用TPL数据流来封装以动作块结尾的管道

本文关键字:结尾 管道 TPL 数据流 封装 使用 | 更新日期: 2023-09-27 18:11:39

TPL Dataflow提供了一个非常有用的功能:

public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput>(
    ITargetBlock<TInput> target, 
    ISourceBlock<TOutput> source)

使您能够将多个块封装到单个转换块中。它返回一个

IPropagatorBlock<TInput, TOutput>

表示管道的开始和结束块。

然而,如果我的管道中的最后一个块是ActionBlock,我不能使用这个,因为ActionBlock不是SourceBlock,函数的返回类型将是ITargetBlock,而不是IPropagatorBlock。

本质上,我要找的是这样的函数:

public static ITargetBlock<TStart> Encapsulate<TStart, TEnd>(
        ITargetBlock<TStart> startBlock, 
        ActionBlock<TEnd> endBlock)

这是一个明智的事情写,还是我错过了一些简单的?我不太确定如何写 -特别是连接完成。我需要创建自己的自定义块类型吗?

编辑:

好的,在阅读了@Panagiotis Kanavos的回复并做了一些修改后,我想出了这个。这是基于封装传播器类,这是现有的DataflowBlock。封装方法使用:

internal sealed class EncapsulatingTarget<TStart, TEnd> : ITargetBlock<TStart>
{
        private readonly ITargetBlock<TStart> startBlock;
        private readonly ActionBlock<TEnd> endBlock;
        public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
        {
            this.startBlock = startBlock;
            this.endBlock = endBlock;
        }
        public Task Completion
        {
            get { return this.endBlock.Completion; }
        }
        public void Complete()
        {
            this.startBlock.Complete();
        }
        void IDataflowBlock.Fault(Exception exception)
        {
            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }
            this.startBlock.Fault(exception);
        }
        public DataflowMessageStatus OfferMessage(
            DataflowMessageHeader messageHeader, 
            TStart messageValue, 
            ISourceBlock<TStart> source, 
            bool consumeToAccept)
        {
            return this.startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
        }
    }

使用TPL数据流来封装以动作块结尾的管道

encapsulation不是用来抽象现有的管道,而是用来创建一个传播器块,该块需要使用现有块和链接无法实现的自定义行为。

例如,滑动窗口示例缓冲所有发送到其输入块的传入消息,并在滑动窗口到期时输出一批所有检索到的消息到其输出块。

方法的名字会引起很多混淆,但是当你理解它们的目的时,它们确实是有意义的:

  • target参数是目标(输入)端点,前面的块将连接到该端点以发送消息。在这种情况下,处理传入消息并决定是否发送到输出(源)块的ActionBlock是有意义的。
  • source参数是源(输出)端点,后续步骤将连接到该端点以接收消息。使用ActionBlock作为源是没有意义的,因为它没有有任何输出。

接受ActionBlock方法作为sourceEncapsulate变体是没有用的,因为你可以简单地从任何先前的步骤链接到一个动作块。

编辑

如果你想要模块化管道,也就是把它分解成可重用的,更易于管理的,你可以创建一个类来构造,你可以使用一个普通的旧类。在该类中,您像往常一样构建管道片段,链接块(确保传播完成),然后将第一步和最后一步的完成任务公开为公共属性,例如:

class MyFragment
{
    public TransformationBlock<SomeMessage,SomeOther> Input {get;}
    public Task Completion {get;}
    ActionBlock<SomeOther> _finalBlock;
    public MyFragment()
    {
        Input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
        _finalBlock=new ActionBlock<SomeOther>(MyMethod);
        var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}
        Input.LinkTo(_finalBlock,linkOptions);
    }
    private SomeOther MyFunction(SomeMessage msg)
    {
    ...
    }
    private void MyMethod(SomeOther msg)
    {
    ...
    }
}

要将片段连接到管道,只需要从管道块链接到暴露的Input块。要等待完成,只需等待公开的Completion任务。

你可以停在这里,如果你想,或者你可以实现ITargetBlock,使片段看起来像一个目标块。您只需要将所有方法委托给Input块,并将Completion属性委托给final块。

,

class MyFragment:ITargetBlock<SomeMessage> 
{
    ....
    public Task Completion {get;}
    public void Complete()
    {
        Input.Complete()
    };
    public void Fault(Exception exc)
    {
        Input.Fault(exc);
    }
    DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
        TInput messageValue,ISourceBlock<TInput> source,bool consumeToAccept)
    {
        return Input.OfferMessage(messageHeader,messageValue,source,consumeToAccept);
    }
}

编辑2

使用@bornfromanegg的类可以将构建片段的行为与公开输入和完成的样板文件分开:

public ITargetBlock<SomeMessage> BuildMyFragment()
{
    var input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
    var step2=new TransformationBlock<SomeOther,SomeFinal>(MyFunction2);
    var finalBlock=new ActionBlock<SomeFinal>(MyMethod);
    var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}
    input.LinkTo(step2,linkOptions);
    step2.LinkTo(finalBlock,linkOptions);
    return new EncapsulatingTarget(input,finalBlock);
}

在我的例子中,我想封装一个由多个最终ActionBlock组成的网络,并使用汇总完成,因此在编辑问题中概述的解决方案不起作用。

因为与"final block"的唯一交互围绕着补全,所以仅为封装呈现补全任务就足够了。(根据建议添加了目标操作构造函数)

public class EncapsulatingTarget<TInput> : ITargetBlock<TInput>
{
    private readonly ITargetBlock<TInput> startBlock;
    private readonly Task completion;
    public EncapsulatingTarget(ITargetBlock<TInput> startBlock, Task completion)
    {
        this.startBlock = startBlock;
        this.completion = completion;
    }
    public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
    {
        this.startBlock = startBlock;
        completion = endBlock.Completion;
    }
    public Task Completion => completion;
    public void Complete()
    {
        startBlock.Complete();
    }
    void IDataflowBlock.Fault(Exception exception)
    {
        if (exception == null)
        {
            throw new ArgumentNullException("exception");
        }
        startBlock.Fault(exception);
    }
    public DataflowMessageStatus OfferMessage(
        DataflowMessageHeader messageHeader,
        TInput messageValue,
        ISourceBlock<TInput> source,
        bool consumeToAccept)
    {
        return startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
    }
}

用法示例:

public ITargetBlock<Client.InputRecord> BuildDefaultFinalActions()
{
    var splitter = new BroadcastBlock<Client.InputRecord>(null);
    var getresults = new TransformManyBlock(...);    // propagator
    var saveinput = new ActionBlock(...);
    var saveresults = new ActionBlock(...);
    splitter.LinkTo(saveinput, PropagateCompletion);
    splitter.LinkTo(getresults, PropagateCompletion);
    getresults.LinkTo(saveresults, PropagateCompletion);
    return new Util.EncapsulatedTarget<Client.InputRecord>(splitter, Task.WhenAll(saveinput.Completion, saveresults.Completion));
}

我本可以创建签名EncapsulatingTarget<T>(ITargetBlock<T> target, params Task[] completions)并将WhenAll(...)移动到构造函数中,但不想对期望的完成通知做假设。