在带有TPL数据流的预定义块之上创建可重用的处理逻辑

本文关键字:创建 处理 TPL 数据流 预定义 | 更新日期: 2023-09-27 18:13:50

我爱TPL数据流。

嗯,一个有趣的设计选择是,大多数预定义块使用委托来允许我们实现处理逻辑。这在简单的情况下看起来很好。但是让我们考虑一下现实世界中的大型应用程序,它们需要模块化和封装。我发现用DELEGATE的方法编写一个结构良好的应用程序是很困难和不自然的。

例如,如果我想要的只是一个MultiplyIntByTwoTransformBlock和一个NoOpActionBlock作为可重用的类类型(不是实例)。我如何实现它?我希望我可以继承TransformBlock/ActionBlock,并说,覆盖一些Process()方法来实现这一点。但是预定义的块是密封的。他们只接受委托。

我知道我可以从头开始创建一个自定义块,但显然这对我来说太复杂了,因为我需要的只是在预定义块的基础上进行一点自定义。

那么,我如何实现我的目标呢?

更新:我并不是说有些事情代表们不能做。我是说在模板方法模式中暴露抽象块在很多情况下更好。比如说,我希望我可以编写一个AbstractMultiplyBlock、MultiplyByTwoBlock和MultiplyByThreeBlock,利用多态性的优势。不幸的是,委托不提供这种数据逻辑可重用性。

在带有TPL数据流的预定义块之上创建可重用的处理逻辑

我不明白为什么你需要自定义块类型。辅助方法应该足够了:

public static IPropagatorBlock<int, int> CreateMultiplyIntTransformBlock(
    int multiplier)
{
    return new TransformBlock<int, int>(i => i * multiplier);
}
public static IPropagatorBlock<int, int> CreateMultiplyIntByTwoTransformBlock()
{
    return CreateMultiplyIntTransformBlock(2);
}

如果你认为一个委托对你来说是不够的,那么也许你试图把你的逻辑放在错误的地方。委托没有理由不能使用正确使用封装和模块化的对象。这样,您的应用程序逻辑与执行代码的逻辑保持分离。

但是如果你真的想做你所问的,你可以通过封装一个TransformBlock在一个自定义类实现IPropgatorBlock,也有你的抽象Process()方法。但是正确地做到这一点有点复杂,详细信息请参阅实现自定义TPL数据流块指南。

现在有一个开源库DataflowEx,它是专门为解决这个问题设计的。此外,它还提供了更多的特性来帮助构造和表示数据流图。

声明:我是DataflowEx的作者。这是为了回答我自己的问题。希望这对其他人也有帮助:)

您可以创建一个抽象类型,实现与目标块类型相同的接口(TransformBlock实现IPropagatorBlockIReceivableSourceBlock)。

与其复制block的行为,不如将所有的方法调用委托给该类型的innerBlock

public abstract class AbstractMultiplyBlock<TInput, TOutput>
    : IPropagatorBlock<TInput, TOutput>, IReceivableSourceBlock<TOutput>
{
    private readonly TransformBlock<TInput, TOutput> innerBlock;
    protected AbstractMultiplyBlock(TransformBlock<TInput, TOutput> innerBlock)
    {
        this.innerBlock = innerBlock;
    }
    // ... interface implementations omitted for brevity, see appendix
}

这个抽象类具有与TransformBlock类相同的所有属性和方法。现在,创建将TransformBlock的实例传递给基构造函数的派生类型。

public sealed class MultiplyByTwoBlock : AbstractMultiplyBlock<int, int>
{
    public MultiplyByTwoBlock()
        : base(new TransformBlock<int, int>(x => x * 2))
    {
    }
}
public sealed class MultiplyByThreeBlock : AbstractMultiplyBlock<int, int>
{
    public MultiplyByThreeBlock()
        : base(new TransformBlock<int, int>(x => x * 3))
    {
    }
}

的用法与TransformBlock

的任何其他实例相同
var calculator1 = new MultiplyByTwoBlock();
var calculator2 = new MultiplyByThreeBlock();
calculator1.LinkTo(calculator2);
// x = 10 * 2 * 3
calculator1.Post(10);
var x = calculator2.Receive();

附录

AbstractMultiplyBlock完整源代码

public abstract class AbstractMultiplyBlock<TInput, TOutput>
    : IPropagatorBlock<TInput, TOutput>, IReceivableSourceBlock<TOutput>
{
    private readonly TransformBlock<TInput, TOutput> innerBlock;
    protected AbstractMultiplyBlock(TransformBlock<TInput, TOutput> innerBlock)
    {
        this.innerBlock = innerBlock;
    }
    public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source,
        bool consumeToAccept)
    {
        return ((ITargetBlock<TInput>)innerBlock).OfferMessage(messageHeader, messageValue, source, consumeToAccept);
    }
    public void Complete()
    {
        innerBlock.Complete();
    }
    public void Fault(Exception exception)
    {
        ((IDataflowBlock)innerBlock).Fault(exception);
    }
    public Task Completion
    {
        get { return innerBlock.Completion; }
    }
    public IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions)
    {
        return innerBlock.LinkTo(target, linkOptions);
    }
    public TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out bool messageConsumed)
    {
        return ((ISourceBlock<TOutput>)innerBlock).ConsumeMessage(messageHeader, target, out messageConsumed);
    }
    public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
    {
        return ((ISourceBlock<TOutput>)innerBlock).ReserveMessage(messageHeader, target);
    }
    public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
    {
        ((ISourceBlock<TOutput>)innerBlock).ReleaseReservation(messageHeader, target);
    }
    public bool TryReceive(Predicate<TOutput> filter, out TOutput item)
    {
        return innerBlock.TryReceive(filter, out item);
    }
    public bool TryReceiveAll(out IList<TOutput> items)
    {
        return innerBlock.TryReceiveAll(out items);
    }
}

到目前为止,我的自定义块已经被简单地编写为使用DataflowBlock.Encapsulate的一堆工厂方法(如果需要的话)。

对于TransformBlock的简单扩展,它返回传递给它的相同项,它看起来像这样:

public static class MutatorBlock {
    public static TransformBlock<T, T> New<T>(Action<T> action, ExecutionDataflowBlockOptions options) {
        return new TransformBlock<T, T>(
            input => {
                action(input);
                return input;
            }, options);
    }
}

要实例化这个类,您使用MutatorBlock.New(...)而不是new MutatorBlock(...),但除此之外,没有太大的区别。

我想我想知道的是,你到底需要一个类型做什么?继承当然不能工作,但是组合仍然可以。你能举个例子说明这是一个主要问题吗?