BroadcastCopyBlock用于保证传输的TPL数据流

本文关键字:TPL 数据流 传输 用于 BroadcastCopyBlock | 更新日期: 2023-09-27 18:18:09

我很高兴能得到一些关于TPL数据流中BroadcastCopyBlock的以下实现的一些输入,它将收到的消息复制到所有消费者,注册到BroadcastCopyBlock并保证交付给所有消费者,这些消费者在接收消息时链接到块。(不像BroadcastBlock,它不保证消息的传递,如果下一个消息进来,在前一个消息被传递给所有消费者之前)。

我主要关心的是消息的保留和保留的释放。如果接收块决定不处理消息,会发生什么?我的理解是,这会造成内存泄漏,因为消息将被无限期保存。我在想,我应该把这条信息标记为未使用,但我不确定该怎么做。我正在考虑一些人工消息接收器(一个没有任何动作的ActionBlock),或者我可以将消息标记为丢弃吗?

也欢迎对实施的进一步输入。

这可能几乎是下面问题的重复,但我更愿意使用我自己的类,而不是一个方法来创建块。还是会被认为是糟糕的风格?
TPL数据流中具有保证交付的BroadcastBlock

/// <summary>
/// Broadcasts the same message to multiple consumers. This does NOT clone the message, all consumers receive an identical message
/// </summary>
/// <typeparam name="T"></typeparam>
public class BrodcastCopyBlock<T> : IPropagatorBlock<T, T>
{
    private ITargetBlock<T> In { get; }
    /// <summary>
    /// Holds a TransformBlock for each target, that subscribed to this block
    /// </summary>
    private readonly IDictionary<ITargetBlock<T>, TransformBlock<T, T>> _OutBlocks = new Dictionary<ITargetBlock<T>, TransformBlock<T, T>>();

    public BrodcastCopyBlock()
    {
        In = new ActionBlock<T>(message => Process(message));
        In.Completion.ContinueWith(task =>
                                   {
                                       if (task.Exception == null)
                                           Complete();
                                       else
                                           Fault(task.Exception);
                                   }
          );
    }
    /// <summary>
    /// Creates a transform source block for the passed target.
    /// </summary>
    /// <param name="target"></param>
    private void CreateOutBlock(ITargetBlock<T> target)
    {
        if (_OutBlocks.ContainsKey(target))
            return;
        var outBlock = new TransformBlock<T, T>(e => e);
        _OutBlocks[target] = outBlock;
    }
    private void Process(T message)
    {
        foreach (var outBlock in _OutBlocks.Values)
        {
            outBlock.Post(message);
        }
    }
    /// <inheritdoc />
    public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
    {
        return In.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
    }
    /// <inheritdoc />
    public void Complete()
    {
        foreach (var outBlock in _OutBlocks.Values)
        {
            ((ISourceBlock<T>)outBlock).Complete();
        }
    }
    /// <inheritdoc />
    public void Fault(Exception exception)
    {
        foreach (var outBlock in _OutBlocks.Values)
        {
            ((ISourceBlock<T>)outBlock).Fault(exception);
        }
    }
    /// <inheritdoc />
    public Task Completion => Task.WhenAll(_OutBlocks.Select(b => b.Value.Completion));
    /// <inheritdoc />
    public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
    {
        CreateOutBlock(target);
        return _OutBlocks[target].LinkTo(target, linkOptions);
    }
    /// <inheritdoc />
    public T ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
    {
        return ((ISourceBlock<T>)_OutBlocks[target]).ConsumeMessage(messageHeader, target, out messageConsumed);
    }
    /// <inheritdoc />
    public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
    {
        return ((ISourceBlock<T>)_OutBlocks[target]).ReserveMessage(messageHeader, target);
    }
    /// <inheritdoc />
    public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
    {
        ((ISourceBlock<T>)_OutBlocks[target]).ReleaseReservation(messageHeader, target);
    }
}

BroadcastCopyBlock用于保证传输的TPL数据流

TL/DR
您的实现使用ActionBlock内部的Post方法,如果目标拒绝消息,则仍然会丢失数据,切换到SendAsync方法,并且可能不需要实现所有这些方法,您只需要ITargetBlock<in TInput>接口实现。


在回到你的主要问题之前,我想先澄清一些事情。我认为你对TPL Dataflow库的一些选项感到困惑,我想在这里解释一下。你所说的The first consumer, which receives the message, deletes it from the queue的行为不是关于BroadcastBlock,它是关于多个消费者链接到ISourceBlock,如BufferBlock:
var buffer = new BufferBlock<int>();
var consumer1 = new ActionBlock<int>(i => {});
var consumer2 = new ActionBlock<int>(i => { Console.WriteLine(i); });
buffer.LinkTo(consumer1);
buffer.LinkTo(consumer2);
// this one will go only for one consumer, no console output present
buffer.Post(1);

BroadcastBlock所做的正是你所说的,考虑以下代码:

private static void UnboundedCase()
{
    var broadcast = new BroadcastBlock<int>(i => i);
    var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Unbounded Block: {i}"));
    var slowAction = new ActionBlock<int>(i =>
        {
            Thread.Sleep(2000);
            Console.WriteLine($"SLOW Unbounded Block: {i}");
        });
    broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
    broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
    for (var i = 0; i < 3; ++i)
    {
        broadcast.SendAsync(i);
    }
    broadcast.Complete();
    slowAction.Completion.Wait();
}

输出将是

FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2

然而,这只能在传入数据的速度小于处理数据的速度时才能做到,因为在其他情况下,由于缓冲区增长,您的内存将很快结束,正如您在问题中所述。让我们看看如果我们使用ExecutionDataflowBlockOptions来限制慢块的传入数据缓冲区会发生什么:

private static void BoundedCase()
{
    var broadcast = new BroadcastBlock<int>(i => i);
    var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Bounded Block: {i}"));
    var slowAction = new ActionBlock<int>(i =>
        {
            Thread.Sleep(2000);
            Console.WriteLine($"SLOW Bounded Block: {i}");
        }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
    broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
    broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
    for (var i = 0; i < 3; ++i)
    {
        broadcast.SendAsync(i);
    }
    broadcast.Complete();
    slowAction.Completion.Wait();
}

输出将是

FAST Bounded Block: 0
FAST Bounded Block: 1
FAST Bounded Block: 2
SLOW Bounded Block: 0
SLOW Bounded Block: 1

正如您所看到的,我们的慢块丢失了最后一条消息,这不是我们想要的。其原因是,默认情况下,BroadcastBlock使用Post方法来传递消息。根据官方介绍文件:

  • 文章
    • 异步发送到目标块的扩展方法。它立即返回数据是否可以接受,并且它不允许目标在稍后的时间使用该消息。
  • 非同步
    • 一个扩展方法,异步发送到目标块,同时支持缓冲。在目标上的Post操作是异步的,但是如果目标想要延迟提供的数据,则没有地方可以缓冲数据,而必须强制目标拒绝。SendAsync启用带缓冲的异步数据发布,这样,如果目标延迟,它稍后将能够从用于异步发布消息的临时缓冲区中检索延迟的数据。

所以,这个方法可以帮助我们完成我们的任务,让我们介绍一些包装器ActionBlock,它做的正是我们想要的- SendAsync我们真正的处理器的数据:

private static void BoundedWrapperInfiniteCase()
{
    var broadcast = new BroadcastBlock<int>(i => i);
    var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Wrapper Block: {i}"));
    var slowAction = new ActionBlock<int>(i =>
    {
        Thread.Sleep(2000);
        Console.WriteLine($"SLOW Wrapper Block: {i}");
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
    var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
    var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
    broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
    broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
    for (var i = 0; i < 3; ++i)
    {
        broadcast.SendAsync(i);
    }
    broadcast.Complete();
    slowAction.Completion.Wait();
}

输出将是

FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2

但是这种等待永远不会结束-我们的基本包装器不会传播链接块的完成,并且ActionBlock不能链接到任何东西。我们可以尝试等待包装器完成:

private static void BoundedWrapperFiniteCase()
{
    var broadcast = new BroadcastBlock<int>(i => i);
    var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST finite Block: {i}"));
    var slowAction = new ActionBlock<int>(i =>
        {
            Thread.Sleep(2000);
            Console.WriteLine($"SLOW finite Block: {i}");
        }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
    var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
    var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
    broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
    broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
    for (var i = 0; i < 3; ++i)
    {
        broadcast.SendAsync(i);
    }
    broadcast.Complete();
    slowActionWrapper.Completion.Wait();
}

输出将是

FAST finite Block: 0
FAST finite Block: 1
FAST finite Block: 2
SLOW finite Block: 0

这绝对不是我们想要的- ActionBlock完成了所有的工作,并且不会等待最后一条消息的发布。此外,我们甚至没有看到第二条消息,因为我们在Sleep方法结束之前退出了该方法!所以你肯定需要自己的实现。

现在,最后,一些关于你的代码的想法:

  1. 你不需要实现这么多的方法——你的包装器将被用作ITargetBlock<in TInput>,所以只实现那个接口。
  2. 您的实现在ActionBlock中使用Post方法,正如我们所看到的,如果消费者方面出现一些问题,可能会导致数据丢失。请考虑SendAsync方法。
  3. 在之前的更改之后,你应该衡量你的数据流的性能-如果你有很多异步等待数据交付,你可能会看到性能和/或内存问题。这应该通过在链接文档中讨论的一些高级设置来修复。
  4. 你的Completion任务的实现实际上颠倒了你的数据流的顺序-你正在等待目标完成,这,我认为,不是很好的做法-你可能应该为你的数据流创建一个结束块(这甚至可以是NullTarget块,它只是同步丢弃传入的消息),并等待它完成。

我只是想添加到VMAtm的优秀答案中,在BoundedWrapperInfiniteCase中,您可以手动传播完成。在调用broadcast.SendAsync()之前添加以下行,然后等待两个动作完成,以使动作包装器完成内部动作:

slowActionWrapper.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) ((IDataflowBlock)slowAction).Fault(t.Exception);
        else slowAction.Complete();
    });
fastActionWrapper.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) ((IDataflowBlock)fastAction).Fault(t.Exception);
        else fastAction.Complete();
    });

var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Wrapper Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
    {
        Thread.Sleep(2000);
        Console.WriteLine($"SLOW Wrapper Block: {i}");
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
// Manually propagate completion to the inner actions
slowActionWrapper.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) ((IDataflowBlock)slowAction).Fault(t.Exception);
        else slowAction.Complete();
    });
fastActionWrapper.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) ((IDataflowBlock)fastAction).Fault(t.Exception);
        else fastAction.Complete();
    });
for (var i = 0; i < 3; ++i)
    broadcast.SendAsync(i);
broadcast.Complete();
// Wait for both inner actions to complete
Task.WaitAll(slowAction.Completion, fastAction.Completion);

输出将与VMAtm的答案相同,但所有任务将正确完成。