如何处理来自匿名任务的异常
本文关键字:任务 异常 何处理 处理 | 更新日期: 2023-09-27 18:12:16
我有一个方法,它以块的形式从服务器提取数据,并返回数据进行处理。我做了一些测量,发现在后台下载块并通过BlockingCollection<T>
返回它们要快得多。这允许客户端和服务器同时工作,而不是互相等待。
public static IEnumerable<DataRecord> GetData(String serverAddress, Int64 dataID)
{
BlockingCollection<DataRecord> records = new BlockingCollection<DataRecord>();
Task.Run(
() =>
{
Boolean isMoreData = false;
do
{
// make server request and process response
// this block can throw
records.Add(response.record);
isMoreData = response.IsMoreData;
}
while (isMoreData);
records.CompleteAdding();
});
return records.GetConsumingEnumerable();
}
调用者(c++/CLI库)应该知道发生了异常,以便它可以再次尝试或根据需要退出。将异常传播给调用者,同时最小程度地更改返回类型的最佳方法是什么?
这就是为什么"立即走人"任务通常不是一个好主意。在您的情况下,它们甚至更糟糕,因为您没有在try/catch
中包装您的添加,records.CompleteAdding
在finally
块中,这意味着从GetConsumingEnumerable
调用枚举器上的MoveNext
最终将无限期地阻塞-这是坏的坏的坏的。
如果你完全在c#的范围内操作,解决方案很简单:更好地分离关注点。您剥离BlockingCollection
位并在它属于的地方运行它:在消费者(客户端)或中间流水线处理阶段(这是您最终要实现的),它将以一种方式设计,它仍然意识到生产者抛出的任何异常。然后您的GetData
签名保持不变,但它变成了一个简单的阻塞枚举,具有完整的异常传播:
public static IEnumerable<DataRecord> GetData(String serverAddress, Int64 dataID)
{
Boolean isMoreData = false;
do
{
// make server request and process response
// this block can throw
yield return response.record;
isMoreData = response.IsMoreData;
}
while (isMoreData);
}
然后管道看起来像这样:
var records = new BlockingCollection<DataRecord>();
var producer = Task.Run(() =>
{
try
{
foreach (var record in GetData("http://foo.com/Service", 22))
{
// Hand over the record to the
// consumer and continue enumerating.
records.Add(record);
}
}
finally
{
// This needs to be called even in
// exceptional scenarios so that the
// consumer task does not block
// indefinitely on the call to MoveNext.
records.CompleteAdding();
}
});
var consumer = Task.Run(() =>
{
foreach (var record in records.GetConsumingEnumerable())
{
// Do something with the record yielded by GetData.
// This runs in parallel with the producer,
// So you get concurrent download and processing
// with a safe handover via the BlockingCollection.
}
});
await Task.WhenAll(producer, consumer);
现在你可以鱼和熊掌兼得了:处理是并行发生的,因为记录是由GetData
产生的,await
在生产者任务中传播任何异常,而在finally
中调用CompleteAdding
确保你的消费者不会无限期地陷入阻塞状态。
由于您使用的是c++,因此上述方法在一定程度上仍然适用(也就是说,正确的做法是在c++中重新实现管道),但实现可能不那么漂亮,而且您使用的方式可能是首选的解决方案,即使由于未观察到的任务,它确实感觉像是一种hack。我实在想不出会出错的情况,因为CompleteAdding
总是因为新引入的try/catch
而被调用。
显然,另一个解决方案是将处理代码移动到您的c#项目中,这可能是可能的,也可能不是取决于您的体系结构。
我发现的最简单的解决方案是返回DataResult
上下文,该上下文可能在其记录被枚举后包含异常。
public class DataResult
{
internal DataResult(IEnumerable<DataRecord> records)
{
Records = records;
}
public IEnumerable<DataRecord> Records { get; private set; }
public Exception Exception { get; internal set; }
}
public static DataResult GetData(String serverAddress, Int64 dataID)
{
BlockingCollection<DataRecord> records = new BlockingCollection<DataRecord>();
DataResult result = new DataResult(records.GetConsumingEnumerable());
Task.Run(
() =>
{
try
{
Boolean isMoreData = false;
do
{
// make server request and process response
// this block can throw
records.Add(response.record);
isMoreData = response.IsMoreData;
}
while (isMoreData);
}
catch (Exception ex)
{
result.Exception = ex;
}
finally
{
records.CompleteAdding();
}
});
return result;
}
如果有异常,调用者(c++/CLI)可以重新抛出它。
void Caller()
{
DataResult^ result = GetData("http://foo.com/Service", 22);
foreach (DataRecord record in result->Records)
{
// process records
}
Exception^ ex = result->Exception;
if (ex != nullptr)
{
throw ex;
}
}