我如何切换()两个流,并在它们之间发出一个中间事件
本文关键字:之间 一个 事件 中间 何切换 两个 | 更新日期: 2023-09-27 18:09:00
我有一个流的流。当一个流结束,下一个流开始时,我需要发出一个中间事件来连接它们。这需要在前一个流的最后一个事件之后发生,并且需要改变最后一个事件的状态。基本上,输出流需要像这样:
Stream 1 - event 1
Stream 1 - event 2
Stream 1 - event 3 (Stream 1 completes)
* My bridging event (Mutation of event 3)
Stream 2 - event 1 etc
只需将每个内部序列转换为最后一个元素的单个串联突变。
IObservable<IObservable<T>> source = ...;
Func<T, T> mutation = ...;
IObservable<T> query = source
.Select(innerSource => innerSource
.Publish(o => o.Concat(o.LastAsync().Select(mutation))))
.Concat();
听起来你可能想要Concat而不是Switch。听起来像是一个流结束,你得到一个桥接值,然后你钩到下一个流。
使用Switch,您可以得到:
- 流A开始
- 从可观察对象流出的流事件
- 流B开始-你的可观察对象从A取消订阅并切换到B
问题是,如果你在旧流完成之前得到一个新流,那么旧流上的LastAsync不会触发,你就不会得到你的桥接事件。
如果你使用Concat,你的可观察对象不会切换到流B,直到流A完成,如果你使用Timothy Shields的方法,你会在A结束时得到你的突变的最后值。
然而,如果您的情况是,您确实希望在新流到达时立即切换,但是您希望在那个时候进行桥接事件,那么这就有点棘手了。这可能是蒂莫西·希尔兹的回答的改编版,比如:
IObservable<IObservable<T>> source = ...;
Func<T, T> mutation = ...;
IObservable<T> query = source
.Select(innerSource => innerSource
.Publish(o => {
// end this stream when source pushes another value
var stream = o.TakeUntil(source);
return stream.Concat(stream.LastAsync().Select(mutation)));
})
.Concat();
可能有更好/更干净的方法来做这件事。对于Rx,总是值得尝试一些看起来复杂的不同方法,因为有时一种方法比其他方法简单得多。