替代数据流BroadcastBlock,保证交付

本文关键字:交付 BroadcastBlock 数据流 | 更新日期: 2023-09-27 18:16:03

我需要某种像BroadcastBlock一样的对象,但要保证传递。所以我用了这个问题的答案。但我不太清楚这里的执行流程。我有一个控制台应用程序。下面是我的代码:

static void Main(string[] args)
{
    ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
    List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();
    for (int i = 0; i <= 10; i++)
        blocks.Add(new ActionBlock<int>(num => 
        {
            int coef = i;
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef); 
        }, execopt));
    ActionBlock<int> broadcaster = new ActionBlock<int>(async num => 
    {
        foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
    }, execopt);
    broadcaster.Completion.ContinueWith(task =>
        {
            foreach (ActionBlock<int> block in blocks) block.Complete();
        });
    Task producer = Produce(broadcaster);
    List<Task> ToWait = new List<Task>();
    foreach (ActionBlock<int> block in blocks) ToWait.Add(block.Completion);
    ToWait.Add(producer);
    Task.WaitAll(ToWait.ToArray());
    Console.ReadLine();
}
static async Task Produce(ActionBlock<int> broadcaster)
{
    for (int i = 0; i <= 15; i++) await broadcaster.SendAsync(i);
    broadcaster.Complete();
}

每个数字必须按顺序处理,所以不能在广播块中使用MaxDegreeOfParallelism。但是所有接收到这个数字的动作块都可以并行运行。

问题来了:

在输出中我可以看到不同的线程id。我理解对了吗?

执行到达广播器中的await block.SendAsync(num);。如果当前块还没有准备好接受这个数字,执行退出广播程序并挂起在Task.WaitAll。当block接受这个数字时,广播程序中foreach语句的其余部分在线程池中执行。一直到最后。foreach的每次迭代都在线程池中执行。但实际上它是顺序发生的。

我的理解是对还是错?我怎么能改变这个代码发送的数字异步所有块?

为了确保如果其中一个区块目前还没有准备好接收这个数字,我不会等待它,所有其他准备好的区块都将接收这个数字。所有的块都可以并行运行。并保证交货

替代数据流BroadcastBlock,保证交付

假设您想通过broadcaster一次处理一个项目,同时使目标块能够同时接收该项目,您需要更改broadcaster以同时向所有块提供该数字,然后异步等待所有一起接受该数字,然后再移动到下一个数字:

var broadcaster = new ActionBlock<int>(async num => 
{
    var tasks = new List<Task>();
    foreach (var block in blocks)
    {
        tasks.Add(block.SendAsync(num));
    }
    await Task.WhenAll(tasks);
}, execopt);

现在,在等待之后没有工作的情况下,您可以稍微优化一下,同时仍然返回一个可等待任务:

ActionBlock<int> broadcaster = new ActionBlock<int>(
    num => Task.WhenAll(blocks.Select(block => block.SendAsync(num))), execopt);