反应式扩展:我如何对可观察性进行分层

本文关键字:可观察性 分层 扩展 反应式 | 更新日期: 2023-09-27 18:20:44

我有一个发布通用消息的服务,我为这些消息创建了一个可观察的对象。这些消息可以包含任何内容,不同的协议可以在上面分层。

我希望添加第二个可观察层,以从这些消息中解释特定的协议。例如,消息的类型可以是"更新"、"错误"或"完成"。我希望重新发布"更新"消息,在"错误"上抛出错误,并在"完成"上完成序列。

我怎样才能做到这一点?

我认为我不能用SelectMany来做这件事;虽然选择器可以在前两种情况下返回Observable.Return()Observable.Throw(),但我无法从选择器中完成(调用observer.OnCompleted()并取消订阅底层可观察对象)。

在我看来,我必须使用Observable.Create()并订阅订阅方法中的底层可观察对象。我已经这样做了,但实现对我来说很奇怪,因为它没有使用Rx中更常见的函数组合风格。这是正确的方法吗?

public IObservable<Message> InterpretProtocol(IObservable<message> stream)
{
  return Observable.Create<Message>(observer =>
  {
    return stream.Subscribe(message =>
    {
      switch (ProtocolMessageTypeOf(message))
      {
        case ProtocolMessageType.Error:
          observer.OnError(new InvalidOperationException(message));
          break;
        case ProtocolMessageType.Complete:
          observer.OnCompleted();
          break;
        default:
          observer.OnNext(message);
          break;
      }
    });
  });
}

反应式扩展:我如何对可观察性进行分层

您可以尝试以下操作:

public IObservable<Message> InterpretProtocol(IObservable<message> stream) {
  return stream.
         TakeWhile(msg => ProtocolMessageTypeOf(message) != ProtocolMessageType.Complete).
         Select(msg => {
             if(ProtocolMessageTypeOf(message) == ProtocolMessageType.Error)
               throw new InvalidOperationException(message);
             else
               return msg;
        });
}