“冲洗”可观察扫描
本文关键字:观察 扫描 冲洗 | 更新日期: 2023-09-27 18:31:17
奇怪的'问题',我不确定处理它的最佳方法是什么。
为了简化起见,假设我有一个可观察的来源,其中有一些数据来自"外部":
{ Value, TimeStamp }
我正在通过Observable.Scan将其用于输出:
{ Value, TimeStamp, TimeDelta }
这意味着我的数据总是"晚一点"出来,但这不是问题。
我们正在从这个可观察对象中"记录",当您停止一个记录时,仍然有一个数据值"卡住"等待它的关注者。
即使这样也不是问题。问题是,当您再次开始录制时,上一个"录制"的最后一个值会卡在新"录制"的开头。
最明显的事情就是取消订阅并重新订阅,但是......这不是那么简单,因为这个扫描的源不仅被记录下来,而且还被发送到UI,并用于进一步的计算:所以我必须做大量的取消订阅/重新订阅。
我试图想出一种方法来注入某种"重置"数据,但不确定如何将信息发送回可观察流的"向上"......
也许我刚刚咬掉的比我能咀嚼的多?还是使用了太多的可观察性?
将有许多方法可以做到这一点,但一种相当容易的方法是使用 .Switch()
运算符。
它本质上是这样的:如果你有一个IObservable<IObservable<T>>
,你可以调用.Switch()
将其变成一个IObservable<T>
,它基本上订阅外部可观察量产生的最后一个值,并取消订阅先前生成的可观察量。
现在这听起来有点时髦,但这是它的工作原理。假设你有一个名为outsideObservable
的可观察量,那么你定义第二个可观察量(resubscribeObservable
),每次你想要重新订阅时都会产生一个值,你订阅它们就像这样:
var subscription =
resubscribeObservable
.Select(_ => outsideObservable)
.Switch()
.Subscribe(x =>
{
/* Do stuff here */
});
现在要重新订阅outsideObservable
您只需从 resubscribeObservable
生成一个值。
最简单的方法是像var resubscribeObservable = new Subject<Unit>();
一样定义它,然后在每次要重新订阅时调用resubscribeObservable.OnNext(Unit.Default);
。
或者,如果您有某个事件,例如用户单击按钮,那么您可以使用基于该事件的可观察量作为您的resubscribeObservable
。
整合评论中的建议,如下所示:
var factory = Observable.Defer(() => outsideObservable);
var resetterObservable = new Subject<Unit>();
var resettableObservable =
resetterObservable
.StartWith(Unit.Default)
.Select(_ => factory)
.Switch()
.Publish()
.RefCount();
发布()。RefCount() 只是为了保护outsideObservable
免受多个同时订阅的影响。
这就是我把公认的答案归结为什么。尚未投入生产,但测试似乎表明它可以满足我的需求。
public interface IResetter
{
IObservable<T> MakeResettable<T>(Func<IObservable<T>> selector);
}
public class Resetter : IResetter
{
private Subject<Unit> _Resetter = new Subject<Unit>();
public void Reset()
{
_Resetter.OnNext(Unit.Default);
}
public IObservable<T> MakeResettable<T>(Func<IObservable<T>> selector)
{
return
_Resetter
.StartWith(Unit.Default)
.Select(_ => Observable.Defer(selector))
.Switch()
.Publish().RefCount();
}
}