忽略Observable中的异常并继续

本文关键字:继续 异常 Observable 忽略 | 更新日期: 2023-09-27 18:22:06

考虑这个可观察的:

_listener = Observable.Defer(() => _deviceTypeProvider.GetDeviceTypes().ToObservable()
            .SelectMany(CreateUdpListener, CreateMessage)
            .OfType<DeviceMessage>()
            .SelectMany(InjectTestMode)
            .OfType<DeviceMessage>()
            .Do(async message => await PublishMessage(message)))
            .Retry()
            .Subscribe(OnMessageReceive, OnError, OnComplete);

除非在CreateMessageInjectTestMode中引发异常,否则此操作正常。

我希望Observable跳过生成异常的序列中的项目并继续。

我读过关于Catch的文章,但我发现的例子可以让你开始一个新的Observable,我想继续我现有的例子。目前,整个序列重新启动,其中包括UDP端口,如果可能的话,我希望避免这些端口。

[更新]

我和一位同事重读了一些关于使用IEnumerable<IObservable<>>IObservable<IObservable<>>的评论,并提出了这个有效的方法!但这是正确的/最佳实践吗?

如果内部可观察到异常,我想知道它是否只会丢弃飞行中的ReceiveAsync事件中的数据包。

var listeners = Observable.Defer(() => _deviceTypeProvider.GetDeviceTypes()
                .ToObservable()
                .Select(UdpListener)
                .SelectMany(listener =>
                {
                  return Observable.Defer(() => Observable
                    .FromAsync(listener.UdpClient.ReceiveAsync)
                    .Where(x => x.Buffer.Length > 0)
                    .Repeat()
                    .Select(result => CreateMessage(listener.DeviceType, result))
                    .SelectMany(InjectTestMode)
                    .OfType<DeviceMessage>()
                    .Do(async message => await PublishMessage(message)))
                    .Retry();
                })).Retry();
_listener = listeners.Subscribe(OnMessageReceive, OnError, OnComplete);

忽略Observable中的异常并继续

IObservable<T>的文档指定序列必须与以下语法匹配:

OnNext* (OnCompleted|OnError)

异常或完成后不能再发出任何值。如果手动创建违反此语法的可观察项,则在使用任何现有Rx运算符时,可能会出现未定义的行为。不好!

如果希望获得重试行为,则将查询建模为IEnumerable<IObservable<T>>IObservable<IObservable<T>>,其中外部IEnumerable<*>IObservable<*>从不抛出。

制作一些静态扩展函数,将委托传递给它,并将它封装到该函数内的try{}catch{}。