如何在故障块上停止处理管道

本文关键字:处理 管道 故障 | 更新日期: 2023-09-27 17:57:24

如果其中一个块决定发生错误,我如何停止处理数据流块,从而阻止下一个块运行。我认为块可以引发异常,但不确定停止进一步处理管道的正确方法是什么。

更新:

private async void buttonDataFlow_Click(object sender, EventArgs e)
{
    var cells = objectListView.CheckedObjects.Cast<Cell>().ToList();
    if (cells == null)
        return;
    var blockPrepare = new TransformBlock<Cell, Cell>(new Func<Cell, Task<Cell>>(Prepare),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });
    var blockPreparationFeedback = new TransformBlock<Cell, Cell>(new Func<Cell, Task<Cell>>(PreparationFeedback),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });
    var blockTestMover = new ActionBlock<Cell>(new Func<Cell, Task>(TestMover),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });
    blockPrepare.LinkTo(blockPreparationFeedback, new DataflowLinkOptions { PropagateCompletion = true });
    blockPreparationFeedback.LinkTo(blockTestMover, new DataflowLinkOptions { PropagateCompletion = true });
    foreach (Cell c in cells)
    {
        var progressHandler = new Progress<string>(value =>
        {
            c.Status = value;
        });
        c.Progress = progressHandler as IProgress<string>;
        blockPrepare.Post(c);
    };
    blockPrepare.Complete();
    try
    {
        await blockTestMover.Completion;
    }
    catch(Exception ee)
    {
        Console.WriteLine(ee.Message);
    }
    Console.WriteLine("Done");
}

更新 2:

    public ITargetBlock<TInput> CreateExceptionCatchingActionBlock<TInput>(
                    Func<TInput, Task> action,
                    Action<Exception> exceptionHandler,
                    ExecutionDataflowBlockOptions dataflowBlockOptions)
    {
        return new ActionBlock<TInput>(async input =>
        {
            try
            {
                await action(input);
            }
            catch (Exception ex)
            {
                exceptionHandler(ex);
            }
        }, dataflowBlockOptions);
    }

如何在故障块上停止处理管道

如果你想要的是块中的异常意味着当前项目确实在管道中走得更远,但其他项目的处理应该继续而不会中断,那么你可以通过创建一个块来实现这一点,如果处理成功,则生成一个项目,但在引发异常时生成零个项目:

public IPropagatorBlock<TInput, TOutput> CreateExceptionCatchingTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    Action<Exception> exceptionHandler,
    ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    return new TransformManyBlock<TInput, TOutput>(async input =>
    {
        try
        {
            var result = await transform(input);
            return new[] { result };
        }
        catch (Exception ex)
        {
            exceptionHandler(ex);
            return Enumerable.Empty<TOutput>();
        }
    }, dataflowBlockOptions);
}

如果您有管道,则可能已经在使用PropagateCompletion = true .这意味着,如果管道中的一个块因异常而失败,则它之后的所有块也将失败。

剩下的就是停止失败块之前的所有块。为此,您可以等待管道中最后一个块的Completion。如果这样做会抛出,则通过调用第一个块来使第一个块失败Fault()。代码可能如下所示:

// set up your pipeline
try
{
    await lastBlock.Completion;
}
catch (Exception ex)
{
    ((IDataflowBlock)firstBlock).Fault(ex);
    throw; // or whatever is appropriate to propagate the exception up
}