替代数据流BroadcastBlock,保证交付
本文关键字:交付 BroadcastBlock 数据流 | 更新日期: 2023-09-27 18:16:03
我需要某种像BroadcastBlock一样的对象,但要保证传递。所以我用了这个问题的答案。但我不太清楚这里的执行流程。我有一个控制台应用程序。下面是我的代码:
static void Main(string[] args)
{
ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();
for (int i = 0; i <= 10; i++)
blocks.Add(new ActionBlock<int>(num =>
{
int coef = i;
Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef);
}, execopt));
ActionBlock<int> broadcaster = new ActionBlock<int>(async num =>
{
foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
}, execopt);
broadcaster.Completion.ContinueWith(task =>
{
foreach (ActionBlock<int> block in blocks) block.Complete();
});
Task producer = Produce(broadcaster);
List<Task> ToWait = new List<Task>();
foreach (ActionBlock<int> block in blocks) ToWait.Add(block.Completion);
ToWait.Add(producer);
Task.WaitAll(ToWait.ToArray());
Console.ReadLine();
}
static async Task Produce(ActionBlock<int> broadcaster)
{
for (int i = 0; i <= 15; i++) await broadcaster.SendAsync(i);
broadcaster.Complete();
}
每个数字必须按顺序处理,所以不能在广播块中使用MaxDegreeOfParallelism。但是所有接收到这个数字的动作块都可以并行运行。
问题来了:
在输出中我可以看到不同的线程id。我理解对了吗?
执行到达广播器中的await block.SendAsync(num);
。如果当前块还没有准备好接受这个数字,执行退出广播程序并挂起在Task.WaitAll。当block接受这个数字时,广播程序中foreach语句的其余部分在线程池中执行。一直到最后。foreach的每次迭代都在线程池中执行。但实际上它是顺序发生的。
我的理解是对还是错?我怎么能改变这个代码发送的数字异步所有块?
为了确保如果其中一个区块目前还没有准备好接收这个数字,我不会等待它,所有其他准备好的区块都将接收这个数字。所有的块都可以并行运行。并保证交货
假设您想通过broadcaster
一次处理一个项目,同时使目标块能够同时接收该项目,您需要更改broadcaster
以同时向所有块提供该数字,然后异步等待所有一起接受该数字,然后再移动到下一个数字:
var broadcaster = new ActionBlock<int>(async num =>
{
var tasks = new List<Task>();
foreach (var block in blocks)
{
tasks.Add(block.SendAsync(num));
}
await Task.WhenAll(tasks);
}, execopt);
现在,在等待之后没有工作的情况下,您可以稍微优化一下,同时仍然返回一个可等待任务:
ActionBlock<int> broadcaster = new ActionBlock<int>(
num => Task.WhenAll(blocks.Select(block => block.SendAsync(num))), execopt);