如何在故障块上停止处理管道
本文关键字:处理 管道 故障 | 更新日期: 2023-09-27 17:57:24
如果其中一个块决定发生错误,我如何停止处理数据流块,从而阻止下一个块运行。我认为块可以引发异常,但不确定停止进一步处理管道的正确方法是什么。
更新:
private async void buttonDataFlow_Click(object sender, EventArgs e)
{
var cells = objectListView.CheckedObjects.Cast<Cell>().ToList();
if (cells == null)
return;
var blockPrepare = new TransformBlock<Cell, Cell>(new Func<Cell, Task<Cell>>(Prepare),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10000,
MaxDegreeOfParallelism = Environment.ProcessorCount,
});
var blockPreparationFeedback = new TransformBlock<Cell, Cell>(new Func<Cell, Task<Cell>>(PreparationFeedback),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10000,
MaxDegreeOfParallelism = Environment.ProcessorCount,
});
var blockTestMover = new ActionBlock<Cell>(new Func<Cell, Task>(TestMover),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10000,
MaxDegreeOfParallelism = Environment.ProcessorCount,
});
blockPrepare.LinkTo(blockPreparationFeedback, new DataflowLinkOptions { PropagateCompletion = true });
blockPreparationFeedback.LinkTo(blockTestMover, new DataflowLinkOptions { PropagateCompletion = true });
foreach (Cell c in cells)
{
var progressHandler = new Progress<string>(value =>
{
c.Status = value;
});
c.Progress = progressHandler as IProgress<string>;
blockPrepare.Post(c);
};
blockPrepare.Complete();
try
{
await blockTestMover.Completion;
}
catch(Exception ee)
{
Console.WriteLine(ee.Message);
}
Console.WriteLine("Done");
}
更新 2:
public ITargetBlock<TInput> CreateExceptionCatchingActionBlock<TInput>(
Func<TInput, Task> action,
Action<Exception> exceptionHandler,
ExecutionDataflowBlockOptions dataflowBlockOptions)
{
return new ActionBlock<TInput>(async input =>
{
try
{
await action(input);
}
catch (Exception ex)
{
exceptionHandler(ex);
}
}, dataflowBlockOptions);
}
如果你想要的是块中的异常意味着当前项目确实在管道中走得更远,但其他项目的处理应该继续而不会中断,那么你可以通过创建一个块来实现这一点,如果处理成功,则生成一个项目,但在引发异常时生成零个项目:
public IPropagatorBlock<TInput, TOutput> CreateExceptionCatchingTransformBlock<TInput, TOutput>(
Func<TInput, Task<TOutput>> transform,
Action<Exception> exceptionHandler,
ExecutionDataflowBlockOptions dataflowBlockOptions)
{
return new TransformManyBlock<TInput, TOutput>(async input =>
{
try
{
var result = await transform(input);
return new[] { result };
}
catch (Exception ex)
{
exceptionHandler(ex);
return Enumerable.Empty<TOutput>();
}
}, dataflowBlockOptions);
}
如果您有管道,则可能已经在使用PropagateCompletion = true
.这意味着,如果管道中的一个块因异常而失败,则它之后的所有块也将失败。
剩下的就是停止失败块之前的所有块。为此,您可以等待管道中最后一个块的Completion
。如果这样做会抛出,则通过调用第一个块来使第一个块失败Fault()
。代码可能如下所示:
// set up your pipeline
try
{
await lastBlock.Completion;
}
catch (Exception ex)
{
((IDataflowBlock)firstBlock).Fault(ex);
throw; // or whatever is appropriate to propagate the exception up
}