RX FromEvent 方法中的异常处理

本文关键字:异常处理 方法 FromEvent RX | 更新日期: 2023-09-27 18:30:40

如何处理取消订阅消息处理程序时引发的异常

var rawSource = Observable.FromEvent<EMSMessageHandler, EMSMessageEventArgs>(
            handler => ((sender, e) => handler(e)),
            a => this._topicSubscribers.ForEach( s => s.MessageHandler += a ),
            a => this._topicSubscribers.ForEach( s => s.MessageHandler -= a));
        return rawSource;

在此代码中,有时我会收到来自消息处理程序的异常,如"非法状态异常:{"消费者已关闭"}"

RX FromEvent<T> 方法中的异常处理

事件通常不会引发,因此可能是源中的错误行为。如果可以在源头修复它,那就这样做。

否则,您将不得不捕获并吞下错误:

a => this._topicSubscribers.ForEach(s => 
{
  try
  {
    s.MessageHandler += a;
  }
  catch
  {
  }
})

这可能不理想,或者只是不使用FromEvent方法:

return Observable.Create<EventPattern<EMSMessageEventArgs>>(observer =>
{
  EMSMessageHandler handler = (sender, e) => 
    observer.OnNext(new EventPattern<EMSMessageEventArgs>(sender, e)));
  try
  {
    _topicSubscribers.ForEach(s => s.MessageHandler += handler);
  }
  catch (Exception ex)
  {
    try
    {
      _topicSubscribers.ForEach(s => s.MessageHandler -= handler);
    }
    catch { }
    observer.OnError(ex);
  }
  return Disposable.Create(() =>
  {
    try
    {
      _topicSubscribers.ForEach(s => s.MessageHandler -= handler);
    }
    catch { }
  });
});

请注意,Rx 需要序列化通知(Rx 设计指南中的 §4.2),因此必须确保所有_topicSubscribers按顺序引发事件,而不是同时引发事件。如果不能,则必须同步所有调用以observer.OnNext自己,可能通过获取锁。

更新:需要明确的是,无论您使用 FromEvent 还是Create,都需要序列化,因此即使您选择简单地吞下像我的第一个示例一样的异常,您仍然需要确保源永远不会同时引发事件; 如果你不能,那么你无论如何都被迫使用我的Create示例和锁。 FromEvent不会为你做这件事。

这种使用这样的FromEvent是自找麻烦,因为 Dave 引用了 Rx 中需要序列化的所有原因。

但是,假设事件不是在每个事件源中同时引发的(我相信EMS MessageConsumer就是这种情况),我会在FromEvent之后而不是在其中进行聚合,并让Rx完成繁重的工作:

var sources = new List<IObservable<EMSMessageEventArgs>();     
foreach(var topicSubscriber in this._topicSubscribers.ToList())
{
    var source = Observable.FromEvent<EMSMessageHandler, EMSMessageEventArgs>(
        handler => ((sender, e) => handler(e)),
        h => topicSubscriber.MessageHandler += h,
        h => topicSubscriber.MessageHandler -= h)
        .Synchronize();
}
rawSource = sources.Merge();

这样,Merge将负责正确聚合和序列化各个源 - 但是,各个事件中仍可能存在并发性。实际上,我不认为FromEvent受到个别来源中同时提出的事件的压力。但是,Merge可能不那么宽容,在这种情况下,使用上述Sychronize()可确保在单个事件源级别以及跨事件源进行序列化。