Rx扩展:如何使一个订阅依赖于另一个订阅

本文关键字:依赖于 另一个 何使一 扩展 Rx | 更新日期: 2023-09-27 18:02:12

我有一个类,在它的构造函数中接受一个可观察对象,然后订阅它并做一些事情,设置属性等。类本身是可观察的。

我想订阅我的源可观察对象,只有当有人订阅了我的类,但我不知道如何做到这一点。

public MyClass : IObservable<MyResult>
{
    private readonly Subject<MyResult> _subject = new Subject<MyResult>();
    private readonly IConnectableObservable<MySource> _source;
    public MyClass(IObservable<MySource> source)
    {
         _source = source
             //All my logic to set properties and such
             //goes here as a side effect, instead of in a subscription...
             .Do(...)
             //I hope that by publishing, side effects will happen only once...
             .Publish();
    }
    public IDisposable Subscribe(IObserver<MyResult> observer)
    {
        return new CompositeDisposable(
             _source.Subscribe(/* 
                  don't have anything to do here,
                  just subscribing to make sure I'm subscribed to source...
                  (this can't be the right way to do it)
             */),
             _subject.Subscribe(observer));
    }
}

更新

@Scott:我明白为什么实现IObservable会是一个反模式。My Class需要消耗单个可观察对象,并暴露3作为属性(最初最常用的可观察对象将由MyClass本身返回,但我认为将其作为属性可能会更好。

我想写的是一个可观察的命令。我知道有一些是存在的,但这更像是一种学习Rx的方法…

public class ObservableCommand<T> : ICommand
{
    private readonly ISubject<T> _executeRequests = new Subject<T>();
    private readonly ISubject<T> _canExecuteRequests = new Subject<T>();
    public IObservable<bool> CanExecuteChanges { get; private set; }
    public IObservable<T> CanExecuteRequests { get; private set; }
    public IObservable<T> ExecuteRequests { get; private set; }
    public ObservableCommand(IObservable<bool> canExecute)
    {
        var source = canExecute.DistinctUntilChanged()
        //How do I dispose of subscription later?
        //I have this fear that I'm going to have a chain of references, 
        //and my entire app will never get GC'd!
        var subscription = source.Subscribe(
            o => {
                if (CanExecuteChanged != null)
                    CanExecuteChanged(this, EventArgs.Empty);
            });
        CanExecuteChanges = source;
        CanExecuteRequests = _canExecuteRequests.AsObservable();
        ExecuteRequests = _executeRequests.AsObservable();
    }
    #region ICommand Members
    public bool  CanExecute(object parameter)
    {
        _canExecuteRequests.OnNext(parameter is T ? (T)parameter : default(T));
    }
    public event EventHandler  CanExecuteChanged;
    public void  Execute(object parameter)
    {
        _executeRequests.OnNext(parameter is T ? (T)parameter : default(T));
    }
    #endregion
}

Rx扩展:如何使一个订阅依赖于另一个订阅

在构造函数中不使用DoPublish,而是使用Subscribe方法如何?

应该说,显式实现IObservable<T>是一种Rx反模式。

您可以使用DeferCreate使订阅依赖于其他订阅者,例如

IObservable<MySource> source;
IObservable<MySource> sourceWithSubSideEffect =  Observable.Defer(() =>
{
   // Do something interesting on Subscription
   // ....
   return source;
});

我为你准备了一段剪报。MyClass实现了IObservable<T>,也有IObserver<T>的方法,但是它们都是私有的。有了额外的OnInitializeOnSubscribe,你应该能够在任何你想要响应的事件上做任何你想做的事情。

如果你想让这个片段可重用,你可以将所有方法定义为partial,因为它们都返回void。然后你可以创建任何你想要的定义。

public class MyClass<T> : IObservable<T>
{
    private readonly IObservable<T> m_Source;
    public MyClass(IObservable<T> source)
    {
        if (source == null) throw new ArgumentNullException("source");
        m_Source = source.Do(OnNext, OnError, OnCompleted);
        OnInitialize();
    }
    public IDisposable Subscribe(IObserver<T> observer)
    {
        OnSubscribe();
        return m_Source.Subscribe(observer);
    }
    private void OnInitialize()
    {
        Console.WriteLine("OnInitialize");
    }
    private void OnSubscribe()
    {
        Console.WriteLine("OnSubscribe");
    }
    private void OnNext(T value)
    {
        Console.WriteLine("OnNext: {0}", value);
    }
    private void OnError(Exception error)
    {
        Console.WriteLine("OnError: {0}", error.Message);
    }
    private void OnCompleted()
    {
        Console.WriteLine("OnCompleted");
    }    
}