在这种情况下,实现 ObservableBase 是最好的还是有其他方法

本文关键字:方法 其他 这种情况下 实现 ObservableBase | 更新日期: 2023-09-27 17:54:29

首先,我没有找到自定义实现ObservableBase或AnonymousObservable的好例子。我不知道我需要在我的情况下实施哪一个(如果有的话(。情况是这样的。

我使用第三方库,有一个类,我们称之为Producer,它允许我在其上设置一个委托,如objProducer.Attach(MyHandler(。MyHandler 将接收来自生产者的消息。我正在尝试围绕生产者创建一个包装器,以使其可观察,理想情况下使其成为一种独特的类型,而不是仅创建可观察的实例(如 Observable.Create(。

已编辑:第三方制作人具有以下界面

public delegate void ProducerMessageHandler(Message objMessage);
public class Producer : IDisposable {
   public void Start();
   public void Attach(ProducerMessageHandler fnHandler);
   public void Dispose();
}

正如我提到的,我无法控制它的源代码。它打算像这样使用:创建一个实例,调用 Attach 并传递一个委托,调用 Start,它基本上在生产者收到或生成消息时启动在提供的委托内接收消息。

我正在考虑创建公共class ProducerObservable : ObservableBase<Message>以便当有人订阅它时,我会(Rx库会(将消息推送给观察者。似乎我需要在我的 ProducerObservable 的构造函数中的某个地方调用 Attach ,然后我需要以某种方式在附加到它的观察器上调用 OnNext。这是否意味着我必须编写所有这些代码:向类添加LinkedList<IObserver<Message>>观察者列表,然后在 ProducerObservable 上调用 SubscribeCore 抽象方法时添加观察器?然后显然我将能够在MyHandler中枚举LinkedList<IObserver<Message>>并为每个调用OnNext。所有这些看起来都是可行的,但感觉并不完全正确。我希望 .net 反应式扩展能够更好地为这种情况做好准备,并且至少在基类的某个地方准备好LinkedList<IObserver<Message>>的实现。

在这种情况下,实现 ObservableBase 是最好的还是有其他方法

在使用 Rx 的代码中,"Producer"对象通常是通过公共属性或方法公开IObservable<T>实例的对象。 Producer类本身实现IObservable<T>的情况不太常见,当它这样做时,它通过使用Rx来做引擎盖下的繁重工作。 您绝对不想自己实现IObservable<T>

下面是一个示例,其中可观察量作为属性公开:

public class Producer
{
    public Producer(ThirdPartyLib.Producer p)
    {
        var c = Observable.Create(observer =>
        {
            ProducerMessageHandler h = msg => observer.OnNext(msg);
            p.Attach(h);
            p.Start();
            return Disposable.Empty;
        }).Publish();
        // Connect the observable the first time someone starts
        // observing
        Stream = Observable.Create(observer =>
        {
            var subscription = c.Subscribe(observer);
            if (Interlocked.Exchange(ref _connected, 1) == 0)
            {
                c.Connect();
            }
            return subscription;
        });
    }
    private int _connected;
    public IObservable<Message> Stream { get; private set; }
}

下面是同一个示例,我们通过委托给 Rx 来实际实现IObservable<T>

public class Producer : IObservable<Message>
{
    public Producer(ThirdPartyLib.Producer p)
    {
        var c = Observable.Create(observer =>
        {
            ProducerMessageHandler h = msg => observer.OnNext(msg);
            p.Attach(h);
            p.Start();
            return Disposable.Empty;
        }).Publish();
        // Connect the observable the first time someone starts
        // observing
        _stream = Observable.Create(observer =>
        {
            var subscription = c.Subscribe(observer);
            if (Interlocked.Exchange(ref _connected, 1) == 0)
            {
                c.Connect();
            }
            return subscription;
        });
    }
    private IObservable<Message> _stream;
    // implement IObservable<T> by delegating to Rx
    public IDisposable Subscribe(IObserver<Message> observer)
    {
        return _stream.Subscribe(observer);
    }
}

以下是你应该做的事情,以保持Rx"友好":

public static class ObservableProducer
{
    public static IObservable<Message> Create()
    {
        return 
            Observable.Using(() => new Producer(), p =>
                Observable.Create<Message>(o => 
                {
                    ProducerMessageHandler handler = m => o.OnNext(m);
                    p.Attach(handler);
                    return Disposable.Create(() => o.OnCompleted());
                }));
    }
}

你会像这样使用它:

IObservable<Message> query = ObservableProducer.Create();

应允许为所有新订阅创建多个Producer实例 - 这就是 Rx 的工作方式。

但是,如果您只需要一个Producer实例,请查看在此可观察对象上使用.Publish()

下面介绍如何确保单个Producer实例是"自我管理"的:

IObservable<Message> query = ObservableProducer.Create().Publish().RefCount();

这将在第一个订阅上创建一个Producer实例,并保留该Producer,直到不再有任何订阅。这使得它成为"自我管理"和更好的解决方案,而不是滚动自己的课程。

如果你必须实现自己的类,那么你经常会犯错误。您作为此问题的答案添加的类有三个我可以看到的。

  1. 附加消息处理程序后实例化主题。如果生成者在附加过程中创建消息,则代码将失败。
  2. 您不会跟踪订阅。如果您不跟踪订阅,则无法处置它们。Rx 查询可以保留打开的昂贵资源,因此应尽早处置它们。
  3. 在处置生产者之前,您不会就此问题致电.OnCompleted()

这是我对你的类的实现:

public class ProducerObservable : IObservable<Message>, IDisposable
{
    private readonly Producer _Producer;
    private readonly Subject<Message> _Subject;
    private readonly CompositeDisposable _Disposables;
    public ProducerObservable()
    {
        _Subject = new Subject<Message>();
        ProducerMessageHandler fnHandler = m => _Subject.OnNext(m);
        _Producer = new Producer();
        _Producer.Attach(fnHandler);
        _Producer.Start();
        _Disposables = new CompositeDisposable();
        _Disposables.Add(_Producer);
        _Disposables.Add(_Subject);
    }
    public void Dispose()
    {
        _Subject.OnCompleted();
        _Disposables.Dispose();
    }
    public IDisposable Subscribe(IObserver<Message> objObserver)
    {
        var subscription = _Subject.Subscribe(objObserver);
        _Disposables.Add(subscription);
        return subscription;
    }
}

我还是不喜欢。在撰写本文时,我是[system.reactive]中拥有银牌的三个人之一(还没有人拥有金牌(,我从未实现过自己的可观察性。我只是刚刚意识到我没有打电话给.OnCompleted()这个主题,所以我回去编辑了上面的代码。这是一个雷区。依靠内置运算符要好得多。

ObservableBase存在的原因是为了帮助防止人们犯错误,但它并不能阻止它。

这个讨论只是给了我一个想法。不就是这个吗?

public class ProducerObservable : IObservable<Message>, IDisposable {
   private readonly Producer _Producer;
   private readonly Subject<Message> _Subject;
   public ProducerObservable() {
      _Produder = new Producer();
      _Producer.Attach(Message_Received);
      _Subject = new Subject<Message>();
      _Producer.Start();
   }
   public void Dispose() {
      _Producer.Dispose();
      _Subject.Dispose();
   }
   public IDisposable Subscribe(IObserver<Message> objObserver) {
      return _Subject.Subscribe(objObserver);
   }
   private void Message_Received(Message objMessage) {
      _Subject.OnNext(objMessage);
   }
}

因此,在我看来,我们避免了额外的级别,额外的可观察量,只有一个可观察量类型,基本上我只看到优点而没有缺点。