如何实现从一个IObserver到另一个IObserver的原子开关

本文关键字:IObserver 另一个 开关 何实现 实现 一个 | 更新日期: 2023-09-27 17:59:10

我有一个IObservable<byte[]>,我使用一些中间步骤将其转换为IObservable<XDocument>

var observedXDocuments =
    from b in observedBytes
    // Lot of intermediate steps to transform byte arrays into XDocuments
    select xDoc;

在某个时间点,我对观察到的XDocument感兴趣,所以我订阅了一个IObserver<XDocument>。稍后,我想订阅另一个IObserver<XDocument>并处理旧的。

如何在一次原子操作中执行此操作,而不会丢失任何观察到的XDocument?我可以做这样的事情:

oldObserver.Dispose();
observedXDocuments.Subscribe(newObserver);
不过,我

担心,在这两个电话之间,我可能会失去XDocument。如果我切换两个呼叫,则可能会两次收到相同的XDocument

如何实现从一个IObserver到另一个IObserver的原子开关

我可能会添加一层间接层。 编写一个名为 ExchangeableObserver 的类,将其订阅到您的可观察量,并使其永久订阅。 ExchangeableObserver 的工作是将所有内容委托给给定的子观察者。 但是程序员可以随时更改被委派给的子观察者。 在我的示例中,我有一个 Exchange(( 方法。 像这样:

public class ExchangeableObserver<T> : IObserver<T> {
  private IObserver<T> inner;
  public ExchangeableObserver(IObserver<T> inner) {
    this.inner=inner;
  }
  public IObserver<T> Exchange(IObserver<T> newInner) {
    return Interlocked.Exchange(ref inner, newInner);
  }
  public void OnNext(T value) {
    inner.OnNext(value);
  }
  public void OnCompleted() {
    inner.OnCompleted();
  }
  public void OnError(Exception error) {
    inner.OnError(error);
  }
}

您可以使用信号量,使 Shure 在IObservable<byte[]>IObservable<XDocument>做准备时不会发生观察者更改。

伪代码如何做到这一点(不是测试(

  System.Threading.ReaderWriterLockSlim criticalSection 
       = new System.Threading.ReaderWriterLockSlim(...);  

  ... converting from `IObservable<byte[]>` to `IObservable<XDocument>`  
  criticalSection.EnterReadLock();
  Call IObservable<XDocument>
  criticalSection.ExitReadLock();
  .... replacing IObservable<XDocument>
  criticalSection.EnterWriteLock();
  Call change IObservable<XDocument>
  criticalSection.ExitWriteLock();

编辑:带Call IObservable<XDocument>

  > What exactly do you mean with the line `Call IObservable<XDocument>`?

我解释你的判决

  > I have an `IObservable<byte[]>` that I transform 
  > into an `IObservable<XDocument>` using some intermediate steps...

您已为 IObservable<byte[]> 注册了一个事件处理程序,该处理程序从byte[]创建XDocument,然后调用触发IObservable<XDocument>事件的东西.

Call IObservable<XDocument>表示触发后续事件的代码