在带有TPL数据流的预定义块之上创建可重用的处理逻辑
本文关键字:创建 处理 TPL 数据流 预定义 | 更新日期: 2023-09-27 18:13:50
我爱TPL数据流。
嗯,一个有趣的设计选择是,大多数预定义块使用委托来允许我们实现处理逻辑。这在简单的情况下看起来很好。但是让我们考虑一下现实世界中的大型应用程序,它们需要模块化和封装。我发现用DELEGATE的方法编写一个结构良好的应用程序是很困难和不自然的。
例如,如果我想要的只是一个MultiplyIntByTwoTransformBlock
和一个NoOpActionBlock
作为可重用的类类型(不是实例)。我如何实现它?我希望我可以继承TransformBlock
/ActionBlock
,并说,覆盖一些Process()
方法来实现这一点。但是预定义的块是密封的。他们只接受委托。
我知道我可以从头开始创建一个自定义块,但显然这对我来说太复杂了,因为我需要的只是在预定义块的基础上进行一点自定义。
那么,我如何实现我的目标呢?
更新:我并不是说有些事情代表们不能做。我是说在模板方法模式中暴露抽象块在很多情况下更好。比如说,我希望我可以编写一个AbstractMultiplyBlock、MultiplyByTwoBlock和MultiplyByThreeBlock,利用多态性的优势。不幸的是,委托不提供这种数据逻辑可重用性。
我不明白为什么你需要自定义块类型。辅助方法应该足够了:
public static IPropagatorBlock<int, int> CreateMultiplyIntTransformBlock(
int multiplier)
{
return new TransformBlock<int, int>(i => i * multiplier);
}
public static IPropagatorBlock<int, int> CreateMultiplyIntByTwoTransformBlock()
{
return CreateMultiplyIntTransformBlock(2);
}
如果你认为一个委托对你来说是不够的,那么也许你试图把你的逻辑放在错误的地方。委托没有理由不能使用正确使用封装和模块化的对象。这样,您的应用程序逻辑与执行代码的逻辑保持分离。
但是如果你真的想做你所问的,你可以通过封装一个TransformBlock
在一个自定义类实现IPropgatorBlock
,也有你的抽象Process()
方法。但是正确地做到这一点有点复杂,详细信息请参阅实现自定义TPL数据流块指南。
现在有一个开源库DataflowEx,它是专门为解决这个问题设计的。此外,它还提供了更多的特性来帮助构造和表示数据流图。
声明:我是DataflowEx的作者。这是为了回答我自己的问题。希望这对其他人也有帮助:)
您可以创建一个抽象类型,实现与目标块类型相同的接口(TransformBlock
实现IPropagatorBlock
和IReceivableSourceBlock
)。
与其复制block的行为,不如将所有的方法调用委托给该类型的innerBlock
。
public abstract class AbstractMultiplyBlock<TInput, TOutput>
: IPropagatorBlock<TInput, TOutput>, IReceivableSourceBlock<TOutput>
{
private readonly TransformBlock<TInput, TOutput> innerBlock;
protected AbstractMultiplyBlock(TransformBlock<TInput, TOutput> innerBlock)
{
this.innerBlock = innerBlock;
}
// ... interface implementations omitted for brevity, see appendix
}
这个抽象类具有与TransformBlock
类相同的所有属性和方法。现在,创建将TransformBlock
的实例传递给基构造函数的派生类型。
public sealed class MultiplyByTwoBlock : AbstractMultiplyBlock<int, int>
{
public MultiplyByTwoBlock()
: base(new TransformBlock<int, int>(x => x * 2))
{
}
}
public sealed class MultiplyByThreeBlock : AbstractMultiplyBlock<int, int>
{
public MultiplyByThreeBlock()
: base(new TransformBlock<int, int>(x => x * 3))
{
}
}
的用法与TransformBlock
var calculator1 = new MultiplyByTwoBlock();
var calculator2 = new MultiplyByThreeBlock();
calculator1.LinkTo(calculator2);
// x = 10 * 2 * 3
calculator1.Post(10);
var x = calculator2.Receive();
附录
AbstractMultiplyBlock
完整源代码
public abstract class AbstractMultiplyBlock<TInput, TOutput>
: IPropagatorBlock<TInput, TOutput>, IReceivableSourceBlock<TOutput>
{
private readonly TransformBlock<TInput, TOutput> innerBlock;
protected AbstractMultiplyBlock(TransformBlock<TInput, TOutput> innerBlock)
{
this.innerBlock = innerBlock;
}
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source,
bool consumeToAccept)
{
return ((ITargetBlock<TInput>)innerBlock).OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
public void Complete()
{
innerBlock.Complete();
}
public void Fault(Exception exception)
{
((IDataflowBlock)innerBlock).Fault(exception);
}
public Task Completion
{
get { return innerBlock.Completion; }
}
public IDisposable LinkTo(ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions)
{
return innerBlock.LinkTo(target, linkOptions);
}
public TOutput ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out bool messageConsumed)
{
return ((ISourceBlock<TOutput>)innerBlock).ConsumeMessage(messageHeader, target, out messageConsumed);
}
public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
{
return ((ISourceBlock<TOutput>)innerBlock).ReserveMessage(messageHeader, target);
}
public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
{
((ISourceBlock<TOutput>)innerBlock).ReleaseReservation(messageHeader, target);
}
public bool TryReceive(Predicate<TOutput> filter, out TOutput item)
{
return innerBlock.TryReceive(filter, out item);
}
public bool TryReceiveAll(out IList<TOutput> items)
{
return innerBlock.TryReceiveAll(out items);
}
}
到目前为止,我的自定义块已经被简单地编写为使用DataflowBlock.Encapsulate
的一堆工厂方法(如果需要的话)。
对于TransformBlock的简单扩展,它返回传递给它的相同项,它看起来像这样:
public static class MutatorBlock {
public static TransformBlock<T, T> New<T>(Action<T> action, ExecutionDataflowBlockOptions options) {
return new TransformBlock<T, T>(
input => {
action(input);
return input;
}, options);
}
}
要实例化这个类,您使用MutatorBlock.New(...)
而不是new MutatorBlock(...)
,但除此之外,没有太大的区别。
我想我想知道的是,你到底需要一个类型做什么?继承当然不能工作,但是组合仍然可以。你能举个例子说明这是一个主要问题吗?