意外行为 - TPL 数据流批处理块在执行触发器批处理时拒绝项目

本文关键字:批处理 触发器 执行 拒绝 项目 TPL 数据流 意外 | 更新日期: 2023-09-27 18:36:07

当您创建具有有限容量的批处理块并在(并行)发布新项目时调用 triggerBatch 时 - 在触发批处理执行期间发布新项目将失败。

调用触发器批处理(每 X 次)是为了确保数据在块中不会延迟太长时间,以防传入数据流暂停或减慢。

以下代码将输出一些"失败后"事件。例如:

    public static void Main(string[] args)
    {
        var batchBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions() { BoundedCapacity = 10000000 });
        var actionBlock = new ActionBlock<int[]>(x => ProcessBatch(x), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
        batchBlock.LinkTo(actionBlock);
        var producerTask = Task.Factory.StartNew(() =>
        {
            //Post 10K Items
            for (int i = 0; i < 10000; i++)
            {
                var postResult = batchBlock.Post(i);
                if (!postResult)
                    Console.WriteLine("Failed to Post");
            }
        });
        var triggerBatchTask = Task.Factory.StartNew(() =>
            {                    
                //Trigger Batch..
                for (int i = 0; i < 1000000; i++)
                    batchBlock.TriggerBatch();
            });
        producerTask.Wait();
        triggerBatchTask.Wait();
    }
    public static void ProcessBatch(int[] batch)
    {
        Console.WriteLine("{0} - {1}", batch.First(), batch.Last());
    }

*请注意,仅当批处理块为有界时,此方案才可重现。

我错过了什么还是批处理块的问题?

意外行为 - TPL 数据流批处理块在执行触发器批处理时拒绝项目

BatchBlock并没有

真正拒绝该项目,而是试图推迟它。除了在Post()的情况下,推迟不是一种选择。解决此问题的一种简单方法是使用 await batchBlock.SendAsync(i) 而不是 batchBlock.Post(i)(这也意味着您需要将Task.Factory.StartNew(() =>更改为 Task.Run(async () => )。

为什么会这样?根据源代码,如果BatchBlock是有界的,则异步处理TriggerBatch(),并且在处理时不接受任何新项。

无论如何,您不应该期望Post()总是在有界块上返回true,如果块已满,Post()也会返回false