忽略Observable中的异常并继续
本文关键字:继续 异常 Observable 忽略 | 更新日期: 2023-09-27 18:22:06
考虑这个可观察的:
_listener = Observable.Defer(() => _deviceTypeProvider.GetDeviceTypes().ToObservable()
.SelectMany(CreateUdpListener, CreateMessage)
.OfType<DeviceMessage>()
.SelectMany(InjectTestMode)
.OfType<DeviceMessage>()
.Do(async message => await PublishMessage(message)))
.Retry()
.Subscribe(OnMessageReceive, OnError, OnComplete);
除非在CreateMessage
或InjectTestMode
中引发异常,否则此操作正常。
我希望Observable跳过生成异常的序列中的项目并继续。
我读过关于Catch的文章,但我发现的例子可以让你开始一个新的Observable,我想继续我现有的例子。目前,整个序列重新启动,其中包括UDP端口,如果可能的话,我希望避免这些端口。
[更新]
我和一位同事重读了一些关于使用IEnumerable<IObservable<>>
或IObservable<IObservable<>>
的评论,并提出了这个有效的方法!但这是正确的/最佳实践吗?
如果内部可观察到异常,我想知道它是否只会丢弃飞行中的ReceiveAsync
事件中的数据包。
var listeners = Observable.Defer(() => _deviceTypeProvider.GetDeviceTypes()
.ToObservable()
.Select(UdpListener)
.SelectMany(listener =>
{
return Observable.Defer(() => Observable
.FromAsync(listener.UdpClient.ReceiveAsync)
.Where(x => x.Buffer.Length > 0)
.Repeat()
.Select(result => CreateMessage(listener.DeviceType, result))
.SelectMany(InjectTestMode)
.OfType<DeviceMessage>()
.Do(async message => await PublishMessage(message)))
.Retry();
})).Retry();
_listener = listeners.Subscribe(OnMessageReceive, OnError, OnComplete);
IObservable<T>
的文档指定序列必须与以下语法匹配:
OnNext* (OnCompleted|OnError)
异常或完成后不能再发出任何值。如果手动创建违反此语法的可观察项,则在使用任何现有Rx运算符时,可能会出现未定义的行为。不好!
如果希望获得重试行为,则将查询建模为IEnumerable<IObservable<T>>
或IObservable<IObservable<T>>
,其中外部IEnumerable<*>
或IObservable<*>
从不抛出。
制作一些静态扩展函数,将委托传递给它,并将它封装到该函数内的try{}catch{}。