如何在使用 Switch 语句时处理内部可观察对象

本文关键字:内部 处理 观察 对象 语句 Switch | 更新日期: 2023-09-27 18:31:51

我有一个嵌套的IObservable,我正在使用Rx的switch语句来帮助我处理以前的序列。但是,如果我想手动处置怎么办?释放外部序列不是一种选择。

_performSearchSubject
.Select(_ => return PerformQuery())
            .Switch()
            .Subscribe(HandleResponseStream, HandleError);

执行查询返回IObservable<Result> ;

如何在使用 Switch 语句时处理内部可观察对象

一段时间后,我自己找到了它...所以答案是:

您可以使用 TakeUntil(IObservable<TOther>) ,然后只需传入一个新主题,只要您想取消以前的流,就可以调用该主题。我认为它类似于 Switch() 语句在底层流中所做的。

最终代码如下所示:

Subject<Unit> _cancellationObservable = new Subject<Unit>();
_performSearchSubject
.Select(_ => {
                return PerformQuery().TakeUntil(_cancellationObservable);
              })
            .Switch()
            .Subscribe(HandleResponseStream, HandleError);

每当我想取消它时,我就打电话给这个人:

_cancellationObservable.OnNext(Unit.Default);

在以下任一情况下,"PerformQuery"返回的可观察流将自动释放:

  • 您的"Switch"语句由于新的"_performSearchSubject"而执行,即之前的流将由"Switch"运算符清理。或
  • 您的整体查询将被释放。

您应该存放由您的 .订阅并使用它来清理您的订阅。您不必使用内部主题。

你真的尝试过你原来的解决方案吗?这里有一些 Linqpad 代码来证明这一点:

var inner = Observable.Create<string>((o) =>
{
    o.OnNext("First item of new inner");
    return Disposable.Create(() => "Inner Disposed".Dump());
});
var outer = Observable.Timer(TimeSpan.MinValue, TimeSpan.FromSeconds(10))
        .Select(_ => inner)
        .Switch()
        .Subscribe(output => output.Dump());

Console.ReadLine();
outer.Dispose();

您会注意到,无论您何时按下键并输入(通过 Console.ReadLine),您的内部可观察流都将被处理掉。同样,每次触发计时器时,Switch 语句都会清理任何以前的流。

如果您希望基于不相关的流进行控制,您仍然可以使用"TakeUntil",但是,如果可以的话,最好只使用一次性 rerturn。