响应式可观察订阅处理

本文关键字:处理 观察 响应 | 更新日期: 2023-09-27 18:10:43

如果我可以访问一个IObservable,我知道它只会返回一个项目,这会工作吗?这是最好的使用模式吗?

IDisposable disposable = null;
disposable = myObservable.Subscribe(x =>
  {
     DoThingWithItem(x);
     if (disposable != null)
     {
       disposable.Dispose();
     }
  });

响应式可观察订阅处理

Subscribe扩展方法返回的一次性对象只允许您在可观察对象自然结束之前手动取消订阅

如果可观察对象完成- OnCompletedOnError -则订阅已经为您处置。

试试这个代码:

var xs = Observable.Create<int>(o =>
{
    var d = Observable.Return(1).Subscribe(o);
    return Disposable.Create(() =>
    {
        Console.WriteLine("Disposed!");
        d.Dispose();
    });
});
var subscription = xs.Subscribe(x => Console.WriteLine(x));

如果你运行上面的代码,你会看到,当可观察对象完成时,"dispose !"被写入到控制台,而不需要调用.Dispose()来订阅。

有一件重要的事情要注意:垃圾收集器永远不会在可观察订阅上调用.Dispose(),所以如果订阅在超出作用域之前没有(或可能没有)自然结束,那么必须处理订阅。

例如:

var wc = new WebClient();
var ds = Observable
    .FromEventPattern<
        DownloadStringCompletedEventHandler,
        DownloadStringCompletedEventArgs>(
            h => wc.DownloadStringCompleted += h,
            h => wc.DownloadStringCompleted -= h);
var subscription =
    ds.Subscribe(d =>
        Console.WriteLine(d.EventArgs.Result));

ds可观察对象只有在订阅时才会附加到事件处理程序,只有在可观察对象完成或订阅被解除时才会分离。因为它是一个事件处理程序,所以它永远不会完成,因为它正在等待更多的事件,因此处置是与事件分离的唯一方法(对于上面的例子)。

当你有一个FromEventPattern可观察对象,你知道它只会返回一个值,那么明智的做法是在订阅之前添加.Take(1)扩展方法,以允许事件处理程序自动分离,然后你不需要手动处置订阅。

一样:

var ds = Observable
    .FromEventPattern<
        DownloadStringCompletedEventHandler,
        DownloadStringCompletedEventArgs>(
            h => wc.DownloadStringCompleted += h,
            h => wc.DownloadStringCompleted -= h)
    .Take(1);

免责声明:我还在学习Rx。所以我不是真正的专家,但我相信Subscribe退回的一次性产品只会取消订阅。此外,如果源完成,就像您的情况一样,将自动取消订阅。所以我认为Dispose是多余的,可以安全地删除。

与一些评论相反,从OnNext内部处理订阅并不罕见。

虽然OnCompletedOnError的处理确实是由Subscribe扩展方法创建的包装订阅为您完成的,但您可能希望根据观察到的值取消订阅(例如在您的情况中:第一个)。你并不总是有一个已知只产生一个值的可观察对象

问题是你只有在订阅后才能获得IDisposable。一个可观察对象可能会在返回IDisposable之前调用你的OnNext来取消订阅(取决于它使用的IScheduler之类的东西)。

在这种情况下,System.Reactive.Disposables.SingleAssignmentDisposable会派上用场。它包装了一个IDisposable,您可以稍后分配,如果SingleAssignmentDisposable已经被分配,它将在分配时立即处理它。它还携带一个属性IsDisposed,它最初是false,当Dispose()被调用时被设置为true

:

IObservable<string> source = ...;
var subscription = new SingleAssignmentDisposable();
subscription.Disposable = source.Subscribe(x =>
{
    if (subscription.IsDisposed) // getting notified though I've told it to stop
        return;
    DoThingsWithItem(x);
    if (x == "the last item I'm interested in")
        subscription.Dispose();
});

Take函数将做您正在寻找的事情。在本例中,Take(1) .