数据流块中的不同消息类型

本文关键字:消息 类型 数据流 | 更新日期: 2023-09-27 18:36:32

我需要设计自定义 datafloaw 块,它的作用类似于缓冲区,但在一段时间后会使项目可用。我会将传入的消息放入队列并启动计时器。当计时器被触发时,我会将一个项目从队列移动到 BufferBlock 中,这将使它可供数据流使用者使用。

如果我将一个项目从内部队列移动到计时器处理程序的输出 BufferBlock 中,显然它不会是线程安全的,因为计时器处理程序可能会与排队调用和损坏队列发生冲突。MSDN 中声称数据流基于 Actor 思想,该思想假定消息以单线程方式执行,从而解决了同步问题。但是,如果我引入一个计时器处理程序,这将打破这一假设。我可以在队列上使用老式锁,也可以使用 CuncurrentQueue,但我很好奇是否有更特殊的数据流方法来管理计时器,以便它不会与数据流块的 Post() 调用冲突。

或者扩展这个问题,是否有一种优雅的方法可以让数据流块处理几种不同类型的消息,并且仍然提供线程安全模型?

数据流块中的不同消息类型

我需要设计自定义 datafloaw 块,它的作用类似于缓冲区,但在一段时间后会使项目可用。

所以,像这样的事情?

public static TransformBlock<T, T> Delay<T>(Timespan delay)
{
  return new TransformBlock<T, T>(async x =>
  {
    await Task.Delay(delay);
    return x;
  },
  new ExecutionDataflowBlockOptions
  {
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
  });
}

如果您仍然认为需要自定义块,请务必阅读实现自定义数据流块指南,其中描述了您需要担心的所有锁。

从您引用的 MSDN 页面:

由于运行时管理数据之间的依赖关系,因此通常可以避免同步对共享数据的访问。

这意味着,在代码中使用数据流块时,通常不必担心同步,因为这些块会为你执行此操作

但是,在编写自定义数据流块时,确实需要自己处理同步。例如,假设您正在实施BufferBlock 。在该块上调用Post()必须以某种方式同步,因为两个源块可以同时调用Post()。没有人会为你处理这种同步,所以你对Post() * 的实现需要使用锁或ConcurrentQueue或类似的东西。

* 其实你实现的不是Post(),而是实现OfferMessage()

但是,如果我正确理解您的要求,您可以通过利用TDF中已经存在的同步来实际实现您的块而无需任何手动同步。您将通过使用两个BufferBlock来实现您的块,一个帮助程序TaskDataflowBlock.Encapsulate()

public static IPropagatorBlock<T, T> CreateDelayedBlock<T>(TimeSpan delay)
{
    var source = new BufferBlock<T>();
    var target = new BufferBlock<T>();
    Task.Run(
        async () =>
        {
            while (await source.OutputAvailableAsync())
            {
                T item;
                if (source.TryReceive(out item))
                {
                    await Task.Delay(delay);
                    await target.SendAsync(item);
                }
                else
                {
                    // this shouldn't happen
                    // nobody else should be able to receive from source
                }
            }
            // TODO: if source failed, fail target
            target.Complete();
        });
    return DataflowBlock.Encapsulate(source, target);
}