我如何切换()两个流,并在它们之间发出一个中间事件

本文关键字:之间 一个 事件 中间 何切换 两个 | 更新日期: 2023-09-27 18:09:00

我有一个流的流。当一个流结束,下一个流开始时,我需要发出一个中间事件来连接它们。这需要在前一个流的最后一个事件之后发生,并且需要改变最后一个事件的状态。基本上,输出流需要像这样:

Stream 1 - event 1
Stream 1 - event 2
Stream 1 - event 3  (Stream 1 completes)
* My bridging event (Mutation of event 3)
Stream 2 - event 1 etc

我如何切换()两个流,并在它们之间发出一个中间事件

只需将每个内部序列转换为最后一个元素的单个串联突变。

IObservable<IObservable<T>> source = ...;
Func<T, T> mutation = ...;
IObservable<T> query = source
    .Select(innerSource => innerSource
        .Publish(o => o.Concat(o.LastAsync().Select(mutation))))
    .Concat();

听起来你可能想要Concat而不是Switch。听起来像是一个流结束,你得到一个桥接值,然后你钩到下一个流。

使用Switch,您可以得到:

  1. 流A开始
  2. 从可观察对象流出的流事件
  3. 流B开始-你的可观察对象从A取消订阅并切换到B

问题是,如果你在旧流完成之前得到一个新流,那么旧流上的LastAsync不会触发,你就不会得到你的桥接事件。

如果你使用Concat,你的可观察对象不会切换到流B,直到流A完成,如果你使用Timothy Shields的方法,你会在A结束时得到你的突变的最后值。

然而,如果您的情况是,您确实希望在新流到达时立即切换,但是您希望在那个时候进行桥接事件,那么这就有点棘手了。这可能是蒂莫西·希尔兹的回答的改编版,比如:

IObservable<IObservable<T>> source = ...;
Func<T, T> mutation = ...;
IObservable<T> query = source
    .Select(innerSource => innerSource
        .Publish(o => {
               // end this stream when source pushes another value
               var stream = o.TakeUntil(source);
               return stream.Concat(stream.LastAsync().Select(mutation)));
               })
        .Concat();

可能有更好/更干净的方法来做这件事。对于Rx,总是值得尝试一些看起来复杂的不同方法,因为有时一种方法比其他方法简单得多。