有条件地组合两个 Rx 流

本文关键字:两个 Rx 组合 有条件 | 更新日期: 2023-09-27 18:34:17

我正在尝试使用 Rx 实现一个场景,其中我有两个热可观察量。流 1 和流 2。根据流 1 的数据,我需要启动流 2 或停止流 2。然后使用 CombineLatest 将两个流数据合并为一个。下面是我能够想出的代码。

  1. 有没有更好的方法来实现这一点?

  2. 我怎样才能让它更通用,就像我将有流 1 然后是流 2 .. n 对于来自 2 的每个流.. n 有条件条件 2 .. n 利用流 1 的数据来检查其他流是否需要启动,然后以 CombineLatest 的方式组合所有数据

法典:

        IDisposable TfsDisposable = null;
        // stream 1
        var hotObs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1));

        // stream 2
        var hotObs2 = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1)).Publish();

        var observerHot =  hotObs.Do(a => 
        {
            // Based on Condition to start the second stream
            if (ConditionToStartStream2)
            {
                TfsDisposable = TfsDisposable ?? hotObs2.Connect();
            }
        })
        .Do(a => 
        {
            // Based on condition 2 stop the second stream
            if (ConditionToStopStream2)
            {
                TfsDisposable?.Dispose();
                TfsDisposable = null;
            }
        }).Publish();

        // Merge both the stream using Combine Latest
        var finalMergedData  = hotObs.CombineLatest(hotObs2, (a, b) => { return string.Format("{0}, {1}", a, b); });
        // Display the result
        finalMergedData.Subscribe(a => { Console.WriteLine("result: {0}", a);  });
        // Start the first hot observable
        observerHot.Connect();

有条件地组合两个 Rx 流

玩这个:

var hotObs = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0));
var hotObs2 = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(0.3));
var query =
    hotObs2.Publish(h2s =>
        hotObs.Publish(hs =>
            hs
                .Select(a => a % 7 == 0 ? h2s : Observable.Empty<long>())
                .Switch()
                .Merge(hs)));

这会获取两个可观察量,并使用重载发布它们,该重载在 lambda 中发布它们。它使它们在 lambda 范围内变热,并防止需要通过管理对.Connect()的调用来四处走动。

然后我只是执行条件检查(在这种情况下a偶数),然后返回另一个流,如果不返回空流。

然后,.Switch通过仅从最新的内部可观测值中获取价值,将IObservable<IObservable<long>>变成IObservable<long>

最后,它与原始hs流合并。

使用上面的示例代码,我得到以下输出:

012312345672324258