数据流管道中的全局逐块错误处理
本文关键字:错误 处理 全局 管道 数据流 | 更新日期: 2023-09-27 18:07:21
我正在设计一个由多个块组成的长时间运行的数据流管道。项目被馈送到管道的输入块,最终通过它,并在最后显示在UI中(作为对用户的礼貌——管道的真正工作是将处理结果保存到磁盘)。
管道块中的lambda函数可能会抛出异常,原因有很多(输入错误、网络故障、计算过程中的错误,等等)。在这种情况下,我不想让整个管道出错,而是想踢出有问题的项,并将其显示在UI的"Errors"下。
最好的方法是什么?我知道我可以在try/catch中包装每个lambda函数:
var errorLoggingBlock = new ActionBlock<Tuple<WorkItem, Exception>>(...)
var workerBlock = new TransformBlock<WorkItem, WorkItem>(item =>
{
try {
return DoStuff(item);
} catch (Exception ex) {
errorLoggingBlock.SendAsync(Tuple.Create(item, ex));
return null;
}
}
但是我在管道中有大约10个块,将代码复制/粘贴到每个块中似乎很愚蠢。另外,我不喜欢返回null的想法,因为现在所有的下游块都必须检查它。
我的下一个最好的想法是创建一个函数,返回一个lambda,为我做包装:
private Func<TArg, TResult> HandleErrors<TArg, TResult>(Func<TArg, TResult> f) where TArg:WorkItem
{
return arg =>
{
try {
return f(arg);
} catch (Exception ex) {
errorLoggingBlock.SendAsync(Tuple.Create(item, ex));
return default(TResult);
}
};
}
但这似乎有点太元了。有没有更好的办法?
这是一个非常有趣的话题。
你可以在链接块时定义过滤器,这意味着你可以将错误结果转移到错误处理块。要做到这一点,块应该返回"元"对象,其中包含处理结果和至少一个失败/成功指示符。
这个想法在面向铁路的编程中有更好的描述,其中链中的每个函数处理成功的结果或将失败的结果转移到"失败的轨道"以最终记录。
实际上,这意味着您应该在每个块之后添加两个链接:一个带有过滤器条件,用于转到错误处理块,另一个默认链接用于转到流中的下一步。
您甚至可以结合这两个想法来处理部分故障。部分故障结果将同时包含故障指示器和有效负载。您可以在将结果传递到下一步之前将其转移到日志块。
我发现更容易明确每个消息的状态,而不是试图通过检查null,缺失值等来确定其状态。这意味着块应该将它们的结果包装在包含状态标志、结果和/或任何错误的"信封"对象中。