如何在IObservable管道中保存异常,并在结束时重新抛出它
本文关键字:结束 新抛出 异常 IObservable 管道 保存 | 更新日期: 2023-09-27 18:05:43
我有以下方法:
public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
return m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
.Merge(maxConcurrentCalls)
.Where(IsValid)
.Select(ToWorkItem)
.Where(o => o != null);
}
实现了以下逻辑:
- 通过从命名空间管理器(
GetNamespaceConnectionInfoSource
)获取IObservable<NamespaceConnectionInfo>
进入monad。 - 当命名空间可用时,获取特定命名空间(
GetPolicySourceForNamespace
)对应的IObservable<DataManagementPolicy>
。但是,可以使用Merge
操作符来限制GetPolicySourceForNamespace
的并发呼叫数。 - 过滤掉坏的
DataManagementPolicy
记录(不能在SQL中完成)。 - 将看似良好的
DataManagementPolicy
记录转换为DataManagementWorkItem
实例。有些可能会变成null
,所以它们在最后被过滤掉。
GetNamespaceConnectionInfoSource
在产生一定数量的有效NamespaceConnectionInfo
对象后会发生故障。完全有可能在最后的可观测序列中已经产生了一定数量的DataManagementWorkItem
天体。
-
GetNamespaceConnectionInfoSource
在生成25个命名空间后抛出 -
GetPolicySourceForNamespace
在每个命名空间生成10个对象 - 并发限制是10
我也有兴趣在最终可观察对象出现故障之前检查它产生的项目:
var dm = DependencyResolver.Instance.GetInstance<IDataManagement>();
var workItems = new List<DataManagementWorkItem>();
try
{
var obs = dm.GetWorkItemSource(10);
obs.Subscribe(wi => workItems.Add(wi));
await obs;
Assert.Fail("An expected exception was not thrown");
}
catch (Exception exc)
{
AssertTheRightException(exc);
}
workItems
集合每次都有不同数量的项目。有一次它有69项,另一次有50项,再一次有18项。
我的解释是,当故障发生时,在处理的各个阶段都有良好的NamespaceConnectionInfo
和DataManagementPolicy
对象,所有这些对象都因为故障而终止。每次的数量是不同的,因为项目是异步生产的。
我的问题就在这里——我不想让他们流产。我希望它们运行到完成,在最终的可观察序列中产生,然后才能通信故障。实际上,我想保留异常并在结束时重新抛出它。
我试着稍微修改一下实现:
public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
Exception fault = null;
return m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Catch<NamespaceConnectionInfo, Exception>(exc =>
{
fault = exc;
return Observable.Empty<NamespaceConnectionInfo>();
})
.Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
.Merge(maxConcurrentCalls)
.Where(IsValid)
.Select(ToWorkItem)
.Where(o => o != null)
.Finally(() =>
{
if (fault != null)
{
throw fault;
}
});
}
不用说,它没有工作。Finally
似乎没有传播任何异常,我实际上同意。
那么,什么是实现我想要的正确方法呢?
编辑
与这个问题无关,我发现我用来收集生成的DataManagementWorkItem
实例的测试代码很糟糕。而不是
var obs = dm.GetWorkItemSource(10);
obs.Subscribe(wi => workItems.Add(wi));
await obs;
应该是
await dm.GetWorkItemSource(1).Do(wi => workItems.Add(wi));
不同之处在于,后者只订阅了一次条目源,而原始版本订阅了两次:
- /
Subscribe
- by
await
它不影响问题,但破坏了我的嘲弄代码。
这更像是一个澄清。每个名称空间产生一个包含10个策略对象的序列。但是这个过程是异步的——策略对象是按顺序异步生成的。在这段时间内,命名空间继续产生,因此在故障发生前给定25个命名空间,产生的命名空间可能有三种可能的"状态":
- 还没有为它生成策略对象,但是异步策略生成过程已经启动
- 一些(但少于10个)策略对象已经产生
- 名称空间的所有10个策略对象都已生成
当命名空间生成过程中出现错误时,不管"好的"命名空间现在处于什么"状态",整个管道都会被终止。
让我们看看下面这个简单的例子:
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
namespace observables
{
class Program
{
static void Main()
{
int count = 0;
var obs = Observable
.Interval(TimeSpan.FromMilliseconds(1))
.Take(50)
.Select(i =>
{
if (25 == Interlocked.Increment(ref count))
{
throw new Exception("Boom!");
}
return i;
})
.Select(i => Observable.Defer(() => Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(10).Select(j => i * 1000 + j)))
.Merge(10);
var items = new HashSet<long>();
try
{
obs.Do(i => items.Add(i)).GetAwaiter().GetResult();
}
catch (Exception exc)
{
Debug.WriteLine(exc.Message);
}
Debug.WriteLine(items.Count);
}
}
}
当我运行它时,我通常有以下输出:
Boom!
192
但是,它也可以显示191。然而,如果我们应用故障连接解决方案(即使它在没有故障的情况下不起作用):
int count = 0;
var fault = new Subject<long>();
var obs = Observable
.Interval(TimeSpan.FromMilliseconds(1))
.Take(50)
.Select(i =>
{
if (25 == Interlocked.Increment(ref count))
{
throw new Exception("Boom!");
}
return i;
})
.Catch<long, Exception>(exc =>
{
fault.OnError(exc);
return Observable.Empty<long>();
})
.Select(i => Observable.Defer(() => Observable.Interval(TimeSpan.FromMilliseconds(1)).Take(10).Select(j => i * 1000 + j)))
.Merge(10)
.Concat(fault);
那么输出始终是240,因为我们让所有已经启动的异步进程完成。
基于pmccloghrylaing
public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
var fault = new Subject<DataManagementWorkItem>();
bool faulted = false;
return m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Catch<NamespaceConnectionInfo, Exception>(exc =>
{
faulted = true;
return Observable.Throw<NamespaceConnectionInfo>(exc);
})
.Finally(() =>
{
if (!faulted)
{
fault.OnCompleted();
}
})
.Catch<NamespaceConnectionInfo, Exception>(exc =>
{
fault.OnError(exc);
return Observable.Empty<NamespaceConnectionInfo>();
})
.Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
.Merge(maxConcurrentCalls)
.Where(IsValid)
.Select(ToWorkItem)
.Where(o => o != null)
.Concat(fault);
}
它在命名空间产生错误和成功时都可以工作,但是看起来很尴尬。加上多个订阅仍然共享故障。一定有一个更优雅的解决方案。
GetNamespaceConnectionInfoSource源代码
public IObservable<NamespaceConnectionInfo> GetNamespaceConnectionInfoSource(bool? isActive = null,
bool? isWorkflowEnabled = null, bool? isScheduleEnabled = null, bool? drainAndDisable = null,
IEnumerable<string> nsList = null, string @where = null, IList<SqlParameter> whereParameters = null)
{
IList<SqlParameter> parameters;
var sql = GetNamespaceConnectionInfoSqls.GetSql(isActive,
isWorkflowEnabled, isScheduleEnabled, drainAndDisable, nsList, @where, whereParameters, out parameters);
var sqlUtil = m_sqlUtilProvider.Get(m_siteSettings.ControlDatabaseConnString);
return sqlUtil.GetSource(typeof(NamespaceConnectionInfo), sqlUtil.GetReaderAsync(sql, parameters)).Cast<NamespaceConnectionInfo>();
}
public IObservable<DbDataReader> GetReaderAsync(string query, IList<SqlParameter> parameters = null, CommandBehavior commandBehavior = CommandBehavior.Default)
{
return Observable.FromAsync(async () =>
{
SqlCommand command = null;
try
{
var conn = await GetConnectionAsync();
command = GetCommand(conn, query, parameters);
return (DbDataReader)await command.ExecuteReaderAsync(commandBehavior | CommandBehavior.CloseConnection);
}
finally
{
DisposeSilently(command);
}
});
}
public IObservable<object> GetSource(Type objectType, IObservable<DbDataReader> readerTask)
{
return Observable.Create<object>(async (obs, ct) => await PopulateSource(objectType, await readerTask, true, obs, ct));
}
private static async Task PopulateSource(Type objectType, DbDataReader reader, bool disposeReader, IObserver<object> obs, CancellationToken ct)
{
try
{
if (IsPrimitiveDataType(objectType))
{
while (await reader.ReadAsync(ct))
{
obs.OnNext(reader[0]);
}
}
else
{
// Get all the properties in our Object
var typeReflector = objectType.GetTypeReflector(TypeReflectorCreationStrategy.PREPARE_DATA_RECORD_CONSTRUCTOR);
// For each property get the data from the reader to the object
while (await reader.ReadAsync(ct))
{
obs.OnNext(typeReflector.DataRecordConstructor == null ?
ReadNextObject(typeReflector, reader) :
typeReflector.DataRecordConstructor(reader));
}
}
}
catch (OperationCanceledException)
{
}
finally
{
if (disposeReader)
{
reader.Dispose();
}
}
}
调用m_namespaceManager.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
返回一个IObservable<NamespaceConnectionInfo>
。现在,任何单个可观察对象的契约是这样的:
OnNext*(OnError|OnCompleted)
这意味着您将得到零个或多个值,后面跟着一个,并且只有一个错误值或补全值。
你不能从一个可观察对象中获得多个错误,也不能在获得错误后获得值。
如果你的可观察对象没有返回一个以上的错误,它就违反了正常的Rx契约。
因此,根据现有的代码,你不可能将错误延迟到可观察对象的结束,因为错误是在可观察对象的结束。
你能做的是改变你在GetNamespaceConnectionInfoSource
中产生你的值的方式,这样它在合并它们之前会生成多个序列调用.Materialize()
。这意味着您将有一个IObservable<Notification<NamespaceConnectionInfo>>
,并且在整个流中可能有多个错误和完成。然后,您可以对该流进行分组,并在处理错误之前处理这些值。但这一切都取决于GetNamespaceConnectionInfoSource
的变化,因为你还没有发布源代码,我不能给你正确的代码。
var xs = new [] { 1, 2, 3, 0, 4, 0, 5 }.ToObservable();
xs
.Select(x =>
{
if (x == 0)
throw new NotSupportedException();
else
return x;
})
.Subscribe(
x => Console.WriteLine(x),
ex => Console.WriteLine(ex.ToString()));
结果如下:
1
2
3
System.NotSupportedException: Specified method is not supported.
at UserQuery.<Main>b__0(Int32 x) in query_ioaahp.cs:line 45
at System.Reactive.Linq.ObservableImpl.Select`2._.OnNext(TSource value)
4
&5
根本不会产生。
现在看看这段代码:
xs
.Select(x =>
Observable
.Start(() =>
{
if (x == 0)
throw new NotSupportedException();
else
return x;
})
.Materialize())
.Merge()
.Where(x => x.Kind != NotificationKind.OnCompleted)
.Subscribe(
x => Console.WriteLine(String.Format(
"{0} {1}",
x.Kind,
x.HasValue ? x.Value.ToString() : "")),
ex => Console.WriteLine(ex.ToString()));
生成以下内容:
OnNext 1
OnNext 4
OnError
OnError
OnNext 5
OnNext 3
OnNext 2
由于引入了并行性,它是无序的。
但是现在你可以处理所有的错误。
Concat
能解决你的问题吗?我已经用Finally
将其包装在Observable.Create
中,以完成faults
主题。
public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
return Observable.Create<DataManagementWorkItem>((observer) =>
{
var faults = new Subject<DataManagementWorkItem>();
return m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Catch<NamespaceConnectionInfo, Exception>(exc =>
{
faults.OnError(exc);
return Observable.Empty<NamespaceConnectionInfo>();
})
.Take(maxConcurrentCalls)
.Select(nci => GetPolicySourceForNamespace(nci))
.Merge()
.Where(IsValid)
.Select(ToWorkItem)
.Where(o => o != null)
.Finally(() => faults.OnCompleted())
.Concat(faults)
.Subscribe(observer);
});
}
同样,这是否返回您所期望的?(24在你的测试)
m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Catch<NamespaceConnectionInfo, Exception>(exc =>
{
faults.OnError(exc);
return Observable.Empty<NamespaceConnectionInfo>();
})
.Count()
是的,基本问题是Merge
有一个失败的快速实现。如果源可观察对象产生错误,或者任何一个内部可观察对象产生错误,那么Merge
将使流失败,而不等待剩余的内部可观察对象完成。
要实现你想要的,你需要在merge看到错误之前"捕获"它,并在内部可观察对象完成后"重新抛出"它:
public IObservable<DataManagementWorkItem> GetWorkItemSource(int maxConcurrentCalls)
{
// wrap within Observable.Defer
// so that each new subscription
// gets its own Error subject
return Observable.Defer(() =>
{
var error = new ReplaySubject<DataManagementWorkItem>(1);
return m_namespaceManager
.GetNamespaceConnectionInfoSource(true, drainAndDisable: false)
.Catch(err =>
{
error.OnError(err);
return Observable.Empty<NamespaceConnectionInfo>();
})
.Finally(error.OnCompleted)
.Select(nci => Observable.Defer(() => GetPolicySourceForNamespace(nci)))
.Merge(maxConcurrentCalls)
.Where(IsValid)
.Select(ToWorkItem)
.Where(o => o != null)
.Concat(error);
});
}
另外,我注意到单元测试将两次订阅到返回的可观察对象,这增加了您的困惑。一次是调用Subscribe
来填充列表,另一次是调用await
。你真的只想订阅一次。我们可以使用.Do
运算符来填充列表,并且应该能够在错误处理程序中检查它:
var dm = DependencyResolver.Instance.GetInstance<IDataManagement>();
var workItems = new List<DataManagementWorkItem>();
try
{
var obs = dm.GetWorkItemSource(10).Do(workItems.Add);
await obs;
Assert.Fail("An expected exception was not thrown");
}
catch (Exception exc)
{
AssertTheRightException(exc);
// workItems should be populated.
}