数据流块中的不同消息类型
本文关键字:消息 类型 数据流 | 更新日期: 2023-09-27 18:36:32
我需要设计自定义 datafloaw 块,它的作用类似于缓冲区,但在一段时间后会使项目可用。我会将传入的消息放入队列并启动计时器。当计时器被触发时,我会将一个项目从队列移动到 BufferBlock 中,这将使它可供数据流使用者使用。
如果我将一个项目从内部队列移动到计时器处理程序的输出 BufferBlock 中,显然它不会是线程安全的,因为计时器处理程序可能会与排队调用和损坏队列发生冲突。MSDN 中声称数据流基于 Actor 思想,该思想假定消息以单线程方式执行,从而解决了同步问题。但是,如果我引入一个计时器处理程序,这将打破这一假设。我可以在队列上使用老式锁,也可以使用 CuncurrentQueue,但我很好奇是否有更特殊的数据流方法来管理计时器,以便它不会与数据流块的 Post() 调用冲突。
或者扩展这个问题,是否有一种优雅的方法可以让数据流块处理几种不同类型的消息,并且仍然提供线程安全模型?
我需要设计自定义 datafloaw 块,它的作用类似于缓冲区,但在一段时间后会使项目可用。
所以,像这样的事情?
public static TransformBlock<T, T> Delay<T>(Timespan delay)
{
return new TransformBlock<T, T>(async x =>
{
await Task.Delay(delay);
return x;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
});
}
如果您仍然认为需要自定义块,请务必阅读实现自定义数据流块指南,其中描述了您需要担心的所有锁。
从您引用的 MSDN 页面:
由于运行时管理数据之间的依赖关系,因此通常可以避免同步对共享数据的访问。
这意味着,在代码中使用数据流块时,通常不必担心同步,因为这些块会为你执行此操作。
但是,在编写自定义数据流块时,确实需要自己处理同步。例如,假设您正在实施BufferBlock
。在该块上调用Post()
必须以某种方式同步,因为两个源块可以同时调用Post()
。没有人会为你处理这种同步,所以你对Post()
* 的实现需要使用锁或ConcurrentQueue
或类似的东西。
* 其实你实现的不是Post()
,而是实现OfferMessage()
。
但是,如果我正确理解您的要求,您可以通过利用TDF中已经存在的同步来实际实现您的块而无需任何手动同步。您将通过使用两个BufferBlock
来实现您的块,一个帮助程序Task
和DataflowBlock.Encapsulate()
:
public static IPropagatorBlock<T, T> CreateDelayedBlock<T>(TimeSpan delay)
{
var source = new BufferBlock<T>();
var target = new BufferBlock<T>();
Task.Run(
async () =>
{
while (await source.OutputAvailableAsync())
{
T item;
if (source.TryReceive(out item))
{
await Task.Delay(delay);
await target.SendAsync(item);
}
else
{
// this shouldn't happen
// nobody else should be able to receive from source
}
}
// TODO: if source failed, fail target
target.Complete();
});
return DataflowBlock.Encapsulate(source, target);
}