订阅为最后提出的方法

本文关键字:方法 最后 | 更新日期: 2023-09-27 18:17:54

有没有办法订阅一个方法,尽管当 onNext 被抛出时它会最后调用?

m_subject.Subscribe(() => Console.writeLine("firstSubscription");
m_subject.SubscribeLast(() => Console.writeLine("secondSubscription");
m_subject.Subscribe(() => Console.writeLine("thirdSubscription");
m_subject.OnNext();
// prints:
// firstSubscription
// thirdSubscription
// secondSubscription

订阅为最后提出的方法

不能让订阅者最后执行,但可以将所有调用包装在单个订阅中。

像这样:

Action action = () => {};
Action lastAction = () => {};
m_subject.Subscribe(() => 
{
    action();
    lastAction();
});
action += (() => Console.writeLine("firstSubscription");
lastAction += (() => Console.writeLine("secondSubscription");
action += (() => Console.writeLine("thirdSubscription");
m_subject.OnNext();
// prints:
// firstSubscription
// thirdSubscription
// secondSubscription
您还可以

通过定义自定义Subject<T>来执行此操作,该内部具有默认主题和最后一个主题。

更新

我添加了重载的ObserveOn来存储ISchedulerSynchronizationContext,然后在订阅时应用它们。类似的技术也可用于使SubscribeOn工作。

public class SubscribeLastSubject<T> : ISubject<T>, IDisposable
{
    private readonly Subject<T> subject = new Subject<T>();
    private readonly Subject<T> lastSubject = new Subject<T>();
    private IScheduler observeScheduler;
    private SynchronizationContext observerContext;
    public void OnNext(T value)
    {
        subject.OnNext(value);
        lastSubject.OnNext(value);
    }
    public void OnError(Exception error)
    {
        subject.OnError(error);
        lastSubject.OnError(error);
    }
    public void OnCompleted()
    {
        subject.OnCompleted();
        lastSubject.OnCompleted();
    }
    public IDisposable Subscribe(IObserver<T> observer)
    {
        return GetObservable().Subscribe(observer);           
    }
    public IDisposable SubscribeLast(IObserver<T> observer)
    {
        return GetLastObservable().Subscribe(observer);     
    }
    public IDisposable SubscribeLast(Action<T> action)
    {
        return GetLastObservable().Subscribe(action);
    }
    public SubscribeLastSubject<T> ObserveOn(IScheduler scheduler)
    {
        observeScheduler = scheduler;
        return this;
    }
    public SubscribeLastSubject<T> ObserveOn(SynchronizationContext context)
    {
        observerContext = context;
        return this;
    }
    public void Dispose()
    {
        subject.Dispose();
        lastSubject.Dispose();
    }
    private IObservable<T> GetObservable()
    {
        if (observerContext != null)
        {
            return subject.ObserveOn(observerContext);
        }
        if (observeScheduler != null)
        {
            return subject.ObserveOn(observeScheduler);
        }
        return subject;
    }
    private IObservable<T> GetLastObservable()
    {
        if (observerContext != null)
        {
            return lastSubject.ObserveOn(observerContext);
        }
        if (observeScheduler != null)
        {
            return lastSubject.ObserveOn(observeScheduler);
        }
        return lastSubject;
    }
}

用法

var m_subject = new SubscribeLastSubject<string>();
m_subject.ObserveOn(Scheduler.CurrentThread).Subscribe(s => Console.WriteLine("firstSubscription"));
m_subject.ObserveOn(Scheduler.CurrentThread).SubscribeLast(s => Console.WriteLine("secondSubscription"));
m_subject.ObserveOn(Scheduler.CurrentThread).Subscribe(s => Console.WriteLine("thirdSubscription"));
m_subject.OnNext("1");
Console.ReadKey();

输出

firstSubscription
thirdSubscription
secondSubscription