BroadcastCopyBlock用于保证传输的TPL数据流
本文关键字:TPL 数据流 传输 用于 BroadcastCopyBlock | 更新日期: 2023-09-27 18:18:09
我很高兴能得到一些关于TPL数据流中BroadcastCopyBlock
的以下实现的一些输入,它将收到的消息复制到所有消费者,注册到BroadcastCopyBlock
并保证交付给所有消费者,这些消费者在接收消息时链接到块。(不像BroadcastBlock
,它不保证消息的传递,如果下一个消息进来,在前一个消息被传递给所有消费者之前)。
我主要关心的是消息的保留和保留的释放。如果接收块决定不处理消息,会发生什么?我的理解是,这会造成内存泄漏,因为消息将被无限期保存。我在想,我应该把这条信息标记为未使用,但我不确定该怎么做。我正在考虑一些人工消息接收器(一个没有任何动作的ActionBlock
),或者我可以将消息标记为丢弃吗?
也欢迎对实施的进一步输入。
这可能几乎是下面问题的重复,但我更愿意使用我自己的类,而不是一个方法来创建块。还是会被认为是糟糕的风格?
TPL数据流中具有保证交付的BroadcastBlock
/// <summary>
/// Broadcasts the same message to multiple consumers. This does NOT clone the message, all consumers receive an identical message
/// </summary>
/// <typeparam name="T"></typeparam>
public class BrodcastCopyBlock<T> : IPropagatorBlock<T, T>
{
private ITargetBlock<T> In { get; }
/// <summary>
/// Holds a TransformBlock for each target, that subscribed to this block
/// </summary>
private readonly IDictionary<ITargetBlock<T>, TransformBlock<T, T>> _OutBlocks = new Dictionary<ITargetBlock<T>, TransformBlock<T, T>>();
public BrodcastCopyBlock()
{
In = new ActionBlock<T>(message => Process(message));
In.Completion.ContinueWith(task =>
{
if (task.Exception == null)
Complete();
else
Fault(task.Exception);
}
);
}
/// <summary>
/// Creates a transform source block for the passed target.
/// </summary>
/// <param name="target"></param>
private void CreateOutBlock(ITargetBlock<T> target)
{
if (_OutBlocks.ContainsKey(target))
return;
var outBlock = new TransformBlock<T, T>(e => e);
_OutBlocks[target] = outBlock;
}
private void Process(T message)
{
foreach (var outBlock in _OutBlocks.Values)
{
outBlock.Post(message);
}
}
/// <inheritdoc />
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
return In.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
/// <inheritdoc />
public void Complete()
{
foreach (var outBlock in _OutBlocks.Values)
{
((ISourceBlock<T>)outBlock).Complete();
}
}
/// <inheritdoc />
public void Fault(Exception exception)
{
foreach (var outBlock in _OutBlocks.Values)
{
((ISourceBlock<T>)outBlock).Fault(exception);
}
}
/// <inheritdoc />
public Task Completion => Task.WhenAll(_OutBlocks.Select(b => b.Value.Completion));
/// <inheritdoc />
public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions)
{
CreateOutBlock(target);
return _OutBlocks[target].LinkTo(target, linkOptions);
}
/// <inheritdoc />
public T ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
{
return ((ISourceBlock<T>)_OutBlocks[target]).ConsumeMessage(messageHeader, target, out messageConsumed);
}
/// <inheritdoc />
public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
return ((ISourceBlock<T>)_OutBlocks[target]).ReserveMessage(messageHeader, target);
}
/// <inheritdoc />
public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target)
{
((ISourceBlock<T>)_OutBlocks[target]).ReleaseReservation(messageHeader, target);
}
}
TL/DR
您的实现使用ActionBlock
内部的Post
方法,如果目标拒绝消息,则仍然会丢失数据,切换到SendAsync
方法,并且可能不需要实现所有这些方法,您只需要ITargetBlock<in TInput>
接口实现。
在回到你的主要问题之前,我想先澄清一些事情。我认为你对
TPL Dataflow
库的一些选项感到困惑,我想在这里解释一下。你所说的The first consumer, which receives the message, deletes it from the queue
的行为不是关于BroadcastBlock
,它是关于多个消费者链接到ISourceBlock
,如BufferBlock
:
var buffer = new BufferBlock<int>();
var consumer1 = new ActionBlock<int>(i => {});
var consumer2 = new ActionBlock<int>(i => { Console.WriteLine(i); });
buffer.LinkTo(consumer1);
buffer.LinkTo(consumer2);
// this one will go only for one consumer, no console output present
buffer.Post(1);
BroadcastBlock
所做的正是你所说的,考虑以下代码:
private static void UnboundedCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Unbounded Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Unbounded Block: {i}");
});
broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}
输出将是
FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2
然而,这只能在传入数据的速度小于处理数据的速度时才能做到,因为在其他情况下,由于缓冲区增长,您的内存将很快结束,正如您在问题中所述。让我们看看如果我们使用ExecutionDataflowBlockOptions
来限制慢块的传入数据缓冲区会发生什么:
private static void BoundedCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Bounded Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Bounded Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
broadcast.LinkTo(slowAction, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastAction, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}
输出将是
FAST Bounded Block: 0
FAST Bounded Block: 1
FAST Bounded Block: 2
SLOW Bounded Block: 0
SLOW Bounded Block: 1
正如您所看到的,我们的慢块丢失了最后一条消息,这不是我们想要的。其原因是,默认情况下,BroadcastBlock
使用Post
方法来传递消息。根据官方介绍文件:
- 文章
- 异步发送到目标块的扩展方法。它立即返回数据是否可以接受,并且它不允许目标在稍后的时间使用该消息。
- 非同步
- 一个扩展方法,异步发送到目标块,同时支持缓冲。在目标上的
Post
操作是异步的,但是如果目标想要延迟提供的数据,则没有地方可以缓冲数据,而必须强制目标拒绝。SendAsync
启用带缓冲的异步数据发布,这样,如果目标延迟,它稍后将能够从用于异步发布消息的临时缓冲区中检索延迟的数据。
所以,这个方法可以帮助我们完成我们的任务,让我们介绍一些包装器ActionBlock
,它做的正是我们想要的- SendAsync
我们真正的处理器的数据:
private static void BoundedWrapperInfiniteCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Wrapper Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Wrapper Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowAction.Completion.Wait();
}
输出将是
FAST Unbounded Block: 0
FAST Unbounded Block: 1
FAST Unbounded Block: 2
SLOW Unbounded Block: 0
SLOW Unbounded Block: 1
SLOW Unbounded Block: 2
但是这种等待永远不会结束-我们的基本包装器不会传播链接块的完成,并且ActionBlock
不能链接到任何东西。我们可以尝试等待包装器完成:
private static void BoundedWrapperFiniteCase()
{
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST finite Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW finite Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < 3; ++i)
{
broadcast.SendAsync(i);
}
broadcast.Complete();
slowActionWrapper.Completion.Wait();
}
输出将是
FAST finite Block: 0
FAST finite Block: 1
FAST finite Block: 2
SLOW finite Block: 0
这绝对不是我们想要的- ActionBlock
完成了所有的工作,并且不会等待最后一条消息的发布。此外,我们甚至没有看到第二条消息,因为我们在Sleep
方法结束之前退出了该方法!所以你肯定需要自己的实现。
现在,最后,一些关于你的代码的想法:
- 你不需要实现这么多的方法——你的包装器将被用作
ITargetBlock<in TInput>
,所以只实现那个接口。 - 您的实现在
ActionBlock
中使用Post
方法,正如我们所看到的,如果消费者方面出现一些问题,可能会导致数据丢失。请考虑SendAsync
方法。 - 在之前的更改之后,你应该衡量你的数据流的性能-如果你有很多异步等待数据交付,你可能会看到性能和/或内存问题。这应该通过在链接文档中讨论的一些高级设置来修复。
- 你的
Completion
任务的实现实际上颠倒了你的数据流的顺序-你正在等待目标完成,这,我认为,不是很好的做法-你可能应该为你的数据流创建一个结束块(这甚至可以是NullTarget
块,它只是同步丢弃传入的消息),并等待它完成。
我只是想添加到VMAtm的优秀答案中,在BoundedWrapperInfiniteCase
中,您可以手动传播完成。在调用broadcast.SendAsync()
之前添加以下行,然后等待两个动作完成,以使动作包装器完成内部动作:
slowActionWrapper.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)slowAction).Fault(t.Exception);
else slowAction.Complete();
});
fastActionWrapper.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)fastAction).Fault(t.Exception);
else fastAction.Complete();
});
。
var broadcast = new BroadcastBlock<int>(i => i);
var fastAction = new ActionBlock<int>(i => Console.WriteLine($"FAST Wrapper Block: {i}"));
var slowAction = new ActionBlock<int>(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"SLOW Wrapper Block: {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 2 });
var fastActionWrapper = new ActionBlock<int>(i => fastAction.SendAsync(i));
var slowActionWrapper = new ActionBlock<int>(i => slowAction.SendAsync(i));
broadcast.LinkTo(slowActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
broadcast.LinkTo(fastActionWrapper, new DataflowLinkOptions { PropagateCompletion = true });
// Manually propagate completion to the inner actions
slowActionWrapper.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)slowAction).Fault(t.Exception);
else slowAction.Complete();
});
fastActionWrapper.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)fastAction).Fault(t.Exception);
else fastAction.Complete();
});
for (var i = 0; i < 3; ++i)
broadcast.SendAsync(i);
broadcast.Complete();
// Wait for both inner actions to complete
Task.WaitAll(slowAction.Completion, fastAction.Completion);
输出将与VMAtm的答案相同,但所有任务将正确完成。