RX Switch()'s订阅和取消订阅顺序

本文关键字:取消 顺序 Switch RX | 更新日期: 2023-09-27 18:19:10

所以我已经写了一个我遇到的问题的草稿。我有一个iobservertable >,其中包含我的流,我想使用switch从它获得最新的项目,但是我的问题可以用下面的代码巧妙地演示:

        var obs = Observable.Create<IObservable<int>>(sub =>
        {
            var item = Observable.Create<int>(innersub =>
            {
                var count = 0;
                return Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe(x => innersub.OnNext(count++));
            }).Publish().RefCount();
            return Observable.Interval(TimeSpan.FromSeconds(10)).Subscribe(x => sub.OnNext(item));
        });
        obs.Switch().Subscribe(x => Console.WriteLine(x));

上面的测试用例显示,当与Publish(). refcount()结合使用时,交换机首先取消订阅,然后订阅新项目。

我想要的是一个连续的数字流,但是测试显示"项目"在新订阅点击之前首先被处理掉,我失去了计数,必须重新开始。

如果项目是相同的,并且使用了refcount,我希望的是订阅首先发生,所以refcount是满意的,然后旧的订阅被处理掉。这种行为是RX可以默认演示的吗?还是需要一些调整才能正确?我相信我可以根据RX源代码的精简版本编写一个足够简单的扩展方法,但如果它已经存在或有更好的方法,我想先知道。

编辑:所写的代码是一个幼稚的例子,以一种简单的方式来演示问题。我实际上拥有的是一个定期发布新可观察对象的可观察对象,它有不同的过滤器,但最终归结为相同的publish/refcount可观察对象。where子句改变了,或者select做了一些不同的事情。真正的用途是几个流的. merge(),所以我对我的逻辑和问题的结论很有信心)。我知道我的例子可以简化。

RX Switch()'s订阅和取消订阅顺序

在当前可观察对象被订阅之前,你必须查看前一个可观察对象被处置的源。这就是.Switch()的工作原理。

如果Rx在新订阅之后处理了,则代码的意图似乎相当于简单地这样做:

var obs = Observable.Create<int>(innersub =>
{
    var count = 0;
    return Observable.Interval(TimeSpan.FromSeconds(2))
        .Subscribe(x => innersub.OnNext(count++));
});
obs.Subscribe(x => Console.WriteLine(x));

在这个例子中,它被简化为:

var obs = Observable.Interval(TimeSpan.FromSeconds(2));
obs.Subscribe(x => Console.WriteLine(x));

也许你可以让我们知道你的潜在需求是什么,我们可以解决这个问题?

这个操作符主要是为冷观测设计的。Switch在订阅新可观察对象之前取消订阅前一个可观察对象。否则,将存在一个竞争条件,在同时订阅两个服务的短暂时间内,可能会漏掉额外的事件。

因为你的底层可观察对象是热的,你可以考虑一个替代的解决方案,你只是修改过滤器/选择"在飞行",而不是使用Switch"重新订阅"。比如:

source
    .Where(t => ApplyCurrentFilter(t))
    .Select(t => ApplyCurrentProjection(t))
    .Subscribe(...);
// do something that changes what `ApplyCurrentFilter` does...

我不知道这比你目前的解决方案是好还是坏,但它确实避免了从源数据取消订阅/重新订阅的需要。

已经说过了,Observable。Create生成一个cold Observable,然后Publish。RefCount只在仍有订阅者时才使其热。可以编写自己的Switch版本,在处理旧的订阅者之前订阅新的订阅者。但我会对比赛情况非常警惕。总的来说,这样做感觉有点奇怪,在Rx中,这通常表明有另一种更干净的方法来做你想做的事情。

在这个例子中,如果你有想要的结果,没有必要发布许多可观察对象并切换它们,因为实际上你只想在整个持续时间内订阅一个可观察对象。因此,它可以归结为Enigmativity所说的。

然而,显然这是一个人为的例子,所以让我们假设有一个更复杂的情况需要这种方法——如果你能详细说明,可能会有所帮助。从这个例子中,似乎你只想订阅一次内部可观察对象。基于这个要求,RefCount是不合适的,但我假设你使用它是因为你想在核心处有一个共享的可观察对象,你用其他操作符包装它,你希望每次都有不同的行为。如果是这种情况,您可以使用这样的方法:

var obs = Observable.Create<IObservable<int>>(sub =>
{
   var item = Observable.Create<int>(innersub =>
   {
       var count = 0;
       return Observable.Interval(TimeSpan.FromSeconds(2))
                        .Subscribe(x => innersub.OnNext(count++));
   }).Publish();
   bool connected = false;
   var disposables = new CompositeDisposable();
   disposables.Add(Observable.Interval(TimeSpan.FromSeconds(10))
                             .Subscribe(x =>
                             {
                                 // push the new stream to the observer first
                                 sub.OnNext(item);
                                 if (!connected)
                                 {
                                     connected = true;
                                     disposables.Add(item.Connect());
                                 }
                             }));
   return disposables;
});

我没有考虑过这种方法的潜在竞争条件,等等,这在很大程度上取决于你的实际情况。然而,在原始帖子的基本测试中,这似乎是你想要的。