做';中间IObservables';在没有最终订阅者的情况下,在根IObservable的整个生命周期中都

本文关键字:在根 IObservable 情况下 周期 生命 IObservables 中间 | 更新日期: 2023-09-27 18:27:57

例如,考虑以下内容:

    public IDisposable Subscribe<T>(IObserver<T> observer)
    {
        return eventStream.Where(e => e is T).Cast<T>().Subscribe(observer);
    }

eventStream是一个长期存在的事件来源。短寿命客户端将使用此方法订阅一段时间,然后通过对返回的IDisposable调用Dispose来取消订阅。

然而,当eventStream仍然存在并且应该保存在内存中时,已经有2个新的IObservables由该方法创建——由Where()方法返回的一个可能由eventStream保持在内存中,由Cast<T>()方法返回的那个可能由Where()方法返回在内存中。

这些"中间IOobservable"(有更好的名字吗?)将如何清理?或者,即使它们不再有订阅,并且除了它们的源IObservable之外没有其他人引用它们,它们现在会在eventStream的生命周期内存在吗?因此,它们将不再有订阅?

如果他们通过通知父母他们不再订阅而被清除,他们怎么知道没有其他人引用他们,并可能在以后订阅他们?

做';中间IObservables';在没有最终订阅者的情况下,在根IObservable的整个生命周期中都

然而,尽管eventStream仍然存在,并且应该保存在内存中,但该方法已经创建了2个新的IObservable-由Where()方法返回的一个可能由eventStream保存在内存,由Cast()方法返回的一个则可能由Where)方法返回在内存中。

你已经落后了。让我们来了解一下正在发生的事情。

IObservable<T> eventStream; //you have this defined and assigned somewhere
public IDisposable Subscribe<T>(IObserver<T> observer)
{
    //let's break this method into multiple lines
    IObservable<T> whereObs = eventStream.Where(e => e is T);
    //whereObs now has a reference to eventStream (and thus will keep it alive), 
    //but eventStream knows nothing of whereObs (thus whereObs will not be kept alive by eventStream)
    IObservable<T> castObs = whereObs.Cast<T>();
    //as with whereObs, castObs has a reference to whereObs,
    //but no one has a reference to castObs
    IDisposable ret = castObs.Subscribe(observer);
    //here is where it gets tricky.
    return ret;
}

ret引用或不引用什么取决于各种可观察性的实现。根据我在Rx库中的Reflector和我自己编写的操作符中看到的情况,大多数操作符都不会返回引用了可观察操作符本身的可丢弃对象。

例如,Where的基本实现类似于(直接在编辑器中键入,无错误处理)

IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> filter)
{
    return Observable.Create<T>(obs =>
      {
         return source.Subscribe(v => if (filter(v)) obs.OnNext(v),
                                 obs.OnError, obs.OnCompleted);
      }
}

请注意,一次性返回的将通过创建的观察器引用过滤器函数,但不会引用可观察的Where。使用相同的模式可以容易地实现CCD_ 15。从本质上讲,运营商变成了观察者包装工厂。

所有这些对手头问题的含义是,在方法结束时,中间IObservable有资格进行垃圾收集。传递给Where的过滤函数与订阅一样长时间保持不变,但一旦订阅被释放或完成,只有eventStream保留(假设它仍然有效)。

EDIT关于supercat的评论,让我们看看编译器如何重写它,或者如何在没有闭包的情况下实现它。

class WhereObserver<T> : IObserver<T>
{
    WhereObserver<T>(IObserver<T> base, Func<T, bool> filter)
    {
        _base = base;
        _filter = filter;
    }
    IObserver<T> _base;
    Func<T, bool> _filter;
    void OnNext(T value)
    {
        if (filter(value)) _base.OnNext(value);
    }
    void OnError(Exception ex) { _base.OnError(ex); }
    void OnCompleted() { _base.OnCompleted(); }
}
class WhereObservable<T> : IObservable<T>
{
    WhereObservable<T>(IObservable<T> source, Func<T, bool> filter)
    {
        _source = source;
        _filter = filter;
    }
    IObservable<T> source;
    Func<T, bool> filter;
    IDisposable Subscribe(IObserver<T> observer)
    {
        return source.Subscribe(new WhereObserver<T>(observer, filter));
    }
}
static IObservable<T> Where(this IObservable<T> source, Func<T, bool> filter)
{
    return new WhereObservable(source, filter);
}

您可以看到,观察者不需要对生成它的可观察对象进行任何引用,可观察对象也不需要跟踪它创建的观察对象。我们甚至没有让任何新的IDisposable从我们的订阅中返回。

事实上,Rx有一些用于匿名可观察/观察者的实际类,这些类接受委托并将接口调用转发给这些委托。它使用闭包来创建这些委托。编译器不需要发出实际实现接口的类,但翻译的精神保持不变。

我想我是在Gideon的回答的帮助下得出结论的,并分解了一个示例Where方法:

我错误地假设每个下游IObservable在任何时候都被上游引用(以便在需要时向下推送事件)。但这将在上游的生命周期内将下游从内存中根除。

事实上,每个上行IObservable都被下行IObservable引用(等待,准备在需要时挂接IObserver)。只要下游被引用,这就将上游植根于内存中(这是有道理的,因为当下游仍在某个地方被引用时,订阅可能随时发生)。

然而,当订阅确实发生时,这种从上游到下游的引用链确实会形成,但仅在管理每个可观察阶段的订阅的IDisposable实现对象上,并且仅在该订阅的生存期内形成。(这也是有道理的——虽然存在订阅,但每个上游"处理逻辑"仍必须保存在内存中,以处理通过到达最终订户IObserver的事件)。

这为这两个问题提供了解决方案——当引用IObservable时,它将在内存中保存所有源(上游)IObservables,为订阅做好准备。当订阅存在时,它将在内存中保存所有下游订阅,允许最终订阅仍然接收事件,即使它的源IObservable可能不再被引用。

将此应用到我问题中的示例中,WhereCast下游可观察器的寿命非常短,直到Subscribe(observer)调用完成为止。然后可以自由收集。现在可以收集中间可观测值这一事实不会给刚刚创建的订阅带来问题,因为它已经形成了自己的订阅对象链(上游->下游),该订阅对象链以源eventStream可观测值为根。该链将在每个下游阶段部署其IDisposable订阅跟踪器后立即发布。

您需要记住IObserable<T>(与IEnumerable<T>一样)是惰性列表。直到有人试图通过订阅或迭代来访问元素,它们才存在。

当您编写list.Where(x => x > 0)时,您并不是在创建一个新列表,您只是在定义如果有人试图访问这些元素,新列表会是什么样子。

这是一个非常重要的区别。

你可以考虑有两种不同的IOobservable。一个是定义和订阅的实例。

IOobservable定义几乎不使用内存。参考文献可以自由共享。它们将被干净的垃圾收集起来。

订阅的实例只有在有人订阅的情况下才存在。它们可能需要相当大的内存。除非使用.Publish扩展,否则无法共享引用。当订阅结束或通过调用.Dispose()终止时,将清理内存。

为每个新订阅创建一组新的订阅实例。当最终的子订阅被处置时,整个链被处置。它们不能共享。如果存在第二个订阅,则会创建独立于第一个订阅的完整订阅实例链。

我希望这能有所帮助。

实现IObservable的类只是一个常规对象。当GC运行并且没有看到任何对它的引用时,它会被清理。它不是"new object()什么时候被清理"。除了内存使用之外,它们是否被清理对于您的程序来说应该是不可见的。

如果一个对象订阅了事件,无论是为了自己使用,还是为了将它们转发给其他对象,这些事件的发布者通常会保持它的活动状态,即使没有其他人会这样做。如果我正确理解您的情况,您有订阅事件的对象,目的是将它们转发给零个或多个其他订阅者。我建议,如果可能的话,你应该设计你的中间IObservable,这样在有人订阅他们的活动之前,他们不会订阅父母的活动,并且他们会在最后一个订阅者取消订阅时取消订阅父母的事件。这是否实用将取决于父对象和子对象IOobservable的线程上下文。进一步注意,(同样取决于线程上下文)可能需要锁定来处理新订户大约在最后订户退出的同时加入的情况。尽管大多数对象的订阅和取消订阅场景都可以使用CompareExchange而不是锁定来处理,但在涉及互连订阅列表的场景中,这通常是不可行的。

如果您的对象将在与父对象的订阅和取消订阅方法不兼容的线程上下文中从其子对象接收订阅和取消预订(IMHO,IObservable应该要求所有合法实现都允许从任意线程上下文中订阅和取消订购,但遗憾的是它没有),您可能别无选择,只能使用intermediate IOobservable在创建后立即创建一个代理对象来代表您处理订阅,并让该对象订阅父级的事件。然后让您自己的对象(代理对其只有弱引用)包括一个终结器,该终结器将通知代理,当其父线程上下文允许时,它需要取消订阅。如果您的代理对象在其最后一个订阅者退出时取消订阅,那将是一件好事,但如果一个新订阅者可能会加入并期望其订阅立即有效,那么只要有人持有可用于请求新订阅的中间观察者的引用,就可能必须保留订阅的代理。