用响应式扩展动态连接序列

本文关键字:连接 动态 扩展 响应 | 更新日期: 2023-09-27 17:54:12

我想创建一个序列,它连接一个或多个动态创建的序列(在运行时)。

我尝试使用mySequence = mySequence.Concat(anotherSequence),但这会破坏当前对mySequence的订阅,因为每次都创建一个新的序列。

用响应式扩展动态连接序列

当你将一个可观察序列连接到另一个序列时,第一个序列必须在你从第二个序列获得任何值之前结束。这听起来更像是要合并两个或多个序列——换句话说,只要序列产生值,就从任何序列中获取值。

所以,如果你允许我把.Concat改为.Merge,听起来你现在有这样的代码:

IObservable<long> mySequence = Observable.Interval(TimeSpan.FromSeconds(0.5)).Take(5);
IDisposable mySequenceSubscription = mySequence.Subscribe(n => Console.WriteLine(n));
IObservable<long> anotherSequence = Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(5);
mySequence = mySequence.Merge(anotherSequence);

如果我运行它,我得到这些值:

<>之前0123.4之前

第二个序列没有合并。

现在,如果你在创建订阅时不知道你想要合并的未来可观察对象是什么,那么你可以这样做:

Subject<IObservable<long>> sources = new Subject<System.IObservable<long>>();
IDisposable sourceSubscription = sources.Merge().Subscribe(n => Console.WriteLine(n));
sources.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.5)).Take(5));
sources.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.2)).Take(5));

现在结果如下所示:

<>之前01023.4123.4之前

正确地合并了在订阅之后添加的两个可观察对象。简单。