Rx扩展:如何使一个订阅依赖于另一个订阅
本文关键字:依赖于 另一个 何使一 扩展 Rx | 更新日期: 2023-09-27 18:02:12
我有一个类,在它的构造函数中接受一个可观察对象,然后订阅它并做一些事情,设置属性等。类本身是可观察的。
我想订阅我的源可观察对象,只有当有人订阅了我的类,但我不知道如何做到这一点。
public MyClass : IObservable<MyResult>
{
private readonly Subject<MyResult> _subject = new Subject<MyResult>();
private readonly IConnectableObservable<MySource> _source;
public MyClass(IObservable<MySource> source)
{
_source = source
//All my logic to set properties and such
//goes here as a side effect, instead of in a subscription...
.Do(...)
//I hope that by publishing, side effects will happen only once...
.Publish();
}
public IDisposable Subscribe(IObserver<MyResult> observer)
{
return new CompositeDisposable(
_source.Subscribe(/*
don't have anything to do here,
just subscribing to make sure I'm subscribed to source...
(this can't be the right way to do it)
*/),
_subject.Subscribe(observer));
}
}
更新@Scott:我明白为什么实现IObservable会是一个反模式。My Class
需要消耗单个可观察对象,并暴露3作为属性(最初最常用的可观察对象将由MyClass
本身返回,但我认为将其作为属性可能会更好。
我想写的是一个可观察的命令。我知道有一些是存在的,但这更像是一种学习Rx的方法…
public class ObservableCommand<T> : ICommand
{
private readonly ISubject<T> _executeRequests = new Subject<T>();
private readonly ISubject<T> _canExecuteRequests = new Subject<T>();
public IObservable<bool> CanExecuteChanges { get; private set; }
public IObservable<T> CanExecuteRequests { get; private set; }
public IObservable<T> ExecuteRequests { get; private set; }
public ObservableCommand(IObservable<bool> canExecute)
{
var source = canExecute.DistinctUntilChanged()
//How do I dispose of subscription later?
//I have this fear that I'm going to have a chain of references,
//and my entire app will never get GC'd!
var subscription = source.Subscribe(
o => {
if (CanExecuteChanged != null)
CanExecuteChanged(this, EventArgs.Empty);
});
CanExecuteChanges = source;
CanExecuteRequests = _canExecuteRequests.AsObservable();
ExecuteRequests = _executeRequests.AsObservable();
}
#region ICommand Members
public bool CanExecute(object parameter)
{
_canExecuteRequests.OnNext(parameter is T ? (T)parameter : default(T));
}
public event EventHandler CanExecuteChanged;
public void Execute(object parameter)
{
_executeRequests.OnNext(parameter is T ? (T)parameter : default(T));
}
#endregion
}
在构造函数中不使用Do
或Publish
,而是使用Subscribe
方法如何?
IObservable<T>
是一种Rx反模式。您可以使用Defer
和Create
使订阅依赖于其他订阅者,例如
IObservable<MySource> source;
IObservable<MySource> sourceWithSubSideEffect = Observable.Defer(() =>
{
// Do something interesting on Subscription
// ....
return source;
});
我为你准备了一段剪报。MyClass
实现了IObservable<T>
,也有IObserver<T>
的方法,但是它们都是私有的。有了额外的OnInitialize
和OnSubscribe
,你应该能够在任何你想要响应的事件上做任何你想做的事情。
如果你想让这个片段可重用,你可以将所有方法定义为partial
,因为它们都返回void
。然后你可以创建任何你想要的定义。
public class MyClass<T> : IObservable<T>
{
private readonly IObservable<T> m_Source;
public MyClass(IObservable<T> source)
{
if (source == null) throw new ArgumentNullException("source");
m_Source = source.Do(OnNext, OnError, OnCompleted);
OnInitialize();
}
public IDisposable Subscribe(IObserver<T> observer)
{
OnSubscribe();
return m_Source.Subscribe(observer);
}
private void OnInitialize()
{
Console.WriteLine("OnInitialize");
}
private void OnSubscribe()
{
Console.WriteLine("OnSubscribe");
}
private void OnNext(T value)
{
Console.WriteLine("OnNext: {0}", value);
}
private void OnError(Exception error)
{
Console.WriteLine("OnError: {0}", error.Message);
}
private void OnCompleted()
{
Console.WriteLine("OnCompleted");
}
}