如何处理来自匿名任务的异常

本文关键字:任务 异常 何处理 处理 | 更新日期: 2023-09-27 18:12:16

我有一个方法,它以块的形式从服务器提取数据,并返回数据进行处理。我做了一些测量,发现在后台下载块并通过BlockingCollection<T>返回它们要快得多。这允许客户端和服务器同时工作,而不是互相等待。

public static IEnumerable<DataRecord> GetData(String serverAddress, Int64 dataID)
{
    BlockingCollection<DataRecord> records = new BlockingCollection<DataRecord>();
    Task.Run(
        () =>
        {
            Boolean isMoreData = false;
            do
            {
                // make server request and process response
                // this block can throw
                records.Add(response.record);
                isMoreData = response.IsMoreData;
            }
            while (isMoreData);
            records.CompleteAdding();
        });
    return records.GetConsumingEnumerable();
}

调用者(c++/CLI库)应该知道发生了异常,以便它可以再次尝试或根据需要退出。将异常传播给调用者,同时最小程度地更改返回类型的最佳方法是什么?

如何处理来自匿名任务的异常

这就是为什么"立即走人"任务通常不是一个好主意。在您的情况下,它们甚至更糟糕,因为您没有在try/catch中包装您的添加,records.CompleteAddingfinally块中,这意味着从GetConsumingEnumerable调用枚举器上的MoveNext最终将无限期地阻塞-这是坏的坏的坏的。

如果你完全在c#的范围内操作,解决方案很简单:更好地分离关注点。您剥离BlockingCollection位并在它属于的地方运行它:在消费者(客户端)或中间流水线处理阶段(这是您最终要实现的),它将以一种方式设计,它仍然意识到生产者抛出的任何异常。然后您的GetData签名保持不变,但它变成了一个简单的阻塞枚举,具有完整的异常传播:

public static IEnumerable<DataRecord> GetData(String serverAddress, Int64 dataID)
{
    Boolean isMoreData = false;
    do
    {
        // make server request and process response
        // this block can throw
        yield return response.record;
        isMoreData = response.IsMoreData;
    }
    while (isMoreData);
}

然后管道看起来像这样:

var records = new BlockingCollection<DataRecord>();
var producer = Task.Run(() =>
{
    try
    {
        foreach (var record in GetData("http://foo.com/Service", 22))
        {
            // Hand over the record to the
            // consumer and continue enumerating.
            records.Add(record);
        }
    }
    finally
    {
        // This needs to be called even in
        // exceptional scenarios so that the
        // consumer task does not block
        // indefinitely on the call to MoveNext.
        records.CompleteAdding();
    }
});
var consumer = Task.Run(() =>
{
    foreach (var record in records.GetConsumingEnumerable())
    {
        // Do something with the record yielded by GetData.
        // This runs in parallel with the producer,
        // So you get concurrent download and processing
        // with a safe handover via the BlockingCollection.
    }
});
await Task.WhenAll(producer, consumer);

现在你可以鱼和熊掌兼得了:处理是并行发生的,因为记录是由GetData产生的,await在生产者任务中传播任何异常,而在finally中调用CompleteAdding确保你的消费者不会无限期地陷入阻塞状态。

由于您使用的是c++,因此上述方法在一定程度上仍然适用(也就是说,正确的做法是在c++中重新实现管道),但实现可能不那么漂亮,而且您使用的方式可能是首选的解决方案,即使由于未观察到的任务,它确实感觉像是一种hack。我实在想不出会出错的情况,因为CompleteAdding总是因为新引入的try/catch而被调用。

显然,另一个解决方案是将处理代码移动到您的c#项目中,这可能是可能的,也可能不是取决于您的体系结构。

我发现的最简单的解决方案是返回DataResult上下文,该上下文可能在其记录被枚举后包含异常。

public class DataResult
{
    internal DataResult(IEnumerable<DataRecord> records)
    {
        Records = records;
    }
    public IEnumerable<DataRecord> Records { get; private set; }
    public Exception Exception { get; internal set; }
}
public static DataResult GetData(String serverAddress, Int64 dataID)
{
    BlockingCollection<DataRecord> records = new BlockingCollection<DataRecord>();
    DataResult result = new DataResult(records.GetConsumingEnumerable());
    Task.Run(
        () =>
        {
            try
            {
                Boolean isMoreData = false;
                do
                {
                    // make server request and process response
                    // this block can throw
                    records.Add(response.record);
                    isMoreData = response.IsMoreData;
                }
                while (isMoreData);
            }
            catch (Exception ex)
            {
                result.Exception = ex;
            }
            finally
            {
                records.CompleteAdding();
            }
        });
    return result;
}

如果有异常,调用者(c++/CLI)可以重新抛出它。

void Caller()
{
    DataResult^ result = GetData("http://foo.com/Service", 22);
    foreach (DataRecord record in result->Records)
    {
        // process records
    }
    Exception^ ex = result->Exception;
    if (ex != nullptr)
    {
        throw ex;
    }
}