RX FromEvent 方法中的异常处理
本文关键字:异常处理 方法 FromEvent RX | 更新日期: 2023-09-27 18:30:40
如何处理取消订阅消息处理程序时引发的异常
var rawSource = Observable.FromEvent<EMSMessageHandler, EMSMessageEventArgs>(
handler => ((sender, e) => handler(e)),
a => this._topicSubscribers.ForEach( s => s.MessageHandler += a ),
a => this._topicSubscribers.ForEach( s => s.MessageHandler -= a));
return rawSource;
在此代码中,有时我会收到来自消息处理程序的异常,如"非法状态异常:{"消费者已关闭"}"
事件通常不会引发,因此可能是源中的错误行为。如果可以在源头修复它,那就这样做。
否则,您将不得不捕获并吞下错误:
a => this._topicSubscribers.ForEach(s =>
{
try
{
s.MessageHandler += a;
}
catch
{
}
})
这可能不理想,或者只是不使用FromEvent
方法:
return Observable.Create<EventPattern<EMSMessageEventArgs>>(observer =>
{
EMSMessageHandler handler = (sender, e) =>
observer.OnNext(new EventPattern<EMSMessageEventArgs>(sender, e)));
try
{
_topicSubscribers.ForEach(s => s.MessageHandler += handler);
}
catch (Exception ex)
{
try
{
_topicSubscribers.ForEach(s => s.MessageHandler -= handler);
}
catch { }
observer.OnError(ex);
}
return Disposable.Create(() =>
{
try
{
_topicSubscribers.ForEach(s => s.MessageHandler -= handler);
}
catch { }
});
});
请注意,Rx 需要序列化通知(Rx 设计指南中的 §4.2),因此必须确保所有_topicSubscribers
按顺序引发事件,而不是同时引发事件。如果不能,则必须同步所有调用以observer.OnNext
自己,可能通过获取锁。
更新:需要明确的是,无论您使用 FromEvent
还是Create
,都需要序列化,因此即使您选择简单地吞下像我的第一个示例一样的异常,您仍然需要确保源永远不会同时引发事件; 如果你不能,那么你无论如何都被迫使用我的Create
示例和锁。 FromEvent
不会为你做这件事。
这种使用这样的FromEvent
是自找麻烦,因为 Dave 引用了 Rx 中需要序列化的所有原因。
但是,假设事件不是在每个事件源中同时引发的(我相信EMS MessageConsumer就是这种情况),我会在FromEvent
之后而不是在其中进行聚合,并让Rx完成繁重的工作:
var sources = new List<IObservable<EMSMessageEventArgs>();
foreach(var topicSubscriber in this._topicSubscribers.ToList())
{
var source = Observable.FromEvent<EMSMessageHandler, EMSMessageEventArgs>(
handler => ((sender, e) => handler(e)),
h => topicSubscriber.MessageHandler += h,
h => topicSubscriber.MessageHandler -= h)
.Synchronize();
}
rawSource = sources.Merge();
这样,Merge
将负责正确聚合和序列化各个源 - 但是,各个事件中仍可能存在并发性。实际上,我不认为FromEvent
受到个别来源中同时提出的事件的压力。但是,Merge
可能不那么宽容,在这种情况下,使用上述Sychronize()
可确保在单个事件源级别以及跨事件源进行序列化。