“冲洗”可观察扫描

本文关键字:观察 扫描 冲洗 | 更新日期: 2023-09-27 18:31:17

这是一个

奇怪的'问题',我不确定处理它的最佳方法是什么。

为了简化起见,假设我有一个可观察的来源,其中有一些数据来自"外部":

{ Value, TimeStamp }

我正在通过Observable.Scan将其用于输出:

{ Value, TimeStamp, TimeDelta }

这意味着我的数据总是"晚一点"出来,但这不是问题。

我们正在从这个可观察对象中"记录",当您停止一个记录时,仍然有一个数据值"卡住"等待它的关注者。

即使这样也不是问题。问题是,当您再次开始录制时,上一个"录制"的最后一个值会卡在新"录制"的开头。

最明显的事情就是取消订阅并重新订阅,但是......这不是那么简单,因为这个扫描的源不仅被记录下来,而且还被发送到UI,并用于进一步的计算:所以我必须做大量的取消订阅/重新订阅。

我试图想出一种方法来注入某种"重置"数据,但不确定如何将信息发送回可观察流的"向上"......

也许我刚刚咬掉的比我能咀嚼的多?还是使用了太多的可观察性?

“冲洗”可观察扫描

将有许多方法可以做到这一点,但一种相当容易的方法是使用 .Switch() 运算符。

它本质上是这样的:如果你有一个IObservable<IObservable<T>>,你可以调用.Switch()将其变成一个IObservable<T>,它基本上订阅外部可观察量产生的最后一个值,并取消订阅先前生成的可观察量。

现在这听起来有点时髦,但这是它的工作原理。假设你有一个名为outsideObservable的可观察量,那么你定义第二个可观察量(resubscribeObservable),每次你想要重新订阅时都会产生一个值,你订阅它们就像这样:

var subscription =
    resubscribeObservable
        .Select(_ => outsideObservable)
        .Switch()
        .Subscribe(x =>
        {
            /* Do stuff here */
        });

现在要重新订阅outsideObservable您只需从 resubscribeObservable 生成一个值。

最简单的方法是像var resubscribeObservable = new Subject<Unit>();一样定义它,然后在每次要重新订阅时调用resubscribeObservable.OnNext(Unit.Default);

或者,如果您有某个事件,例如用户单击按钮,那么您可以使用基于该事件的可观察量作为您的resubscribeObservable

整合评论中的建议,如下所示:

var factory = Observable.Defer(() => outsideObservable);
var resetterObservable = new Subject<Unit>();
var resettableObservable = 
       resetterObservable
           .StartWith(Unit.Default)
           .Select(_ => factory)
           .Switch()
           .Publish()
           .RefCount();

发布()。RefCount() 只是为了保护outsideObservable免受多个同时订阅的影响。

这就是我把公认的答案归结为什么。尚未投入生产,但测试似乎表明它可以满足我的需求。

public interface IResetter
{
    IObservable<T> MakeResettable<T>(Func<IObservable<T>> selector);
}
public class Resetter : IResetter
{
    private Subject<Unit> _Resetter = new Subject<Unit>();
    public void Reset()
    {
        _Resetter.OnNext(Unit.Default);
    }
    public IObservable<T> MakeResettable<T>(Func<IObservable<T>> selector)
    {
        return
            _Resetter
                .StartWith(Unit.Default)
                .Select(_ => Observable.Defer(selector))
                .Switch()
                .Publish().RefCount();
    }
}