平滑Rx观测值

本文关键字:Rx 平滑 | 更新日期: 2023-09-27 18:17:21

非常类似于这个问题:Rx IObservable缓冲平滑突发事件,我对平滑可能发生突发事件的observable很感兴趣。

希望下面的图表说明了我的目标:

Raw:       A--B--CDE-F--------------G-----------------------
Interval:  o--o--o--o--o--o--o--o--o--o--o--o--o--o--o--o--o
Output:    A--B--C--D--E--F-----------G---------------------

给定原始流,我希望在有规律的间隔内扩展这些事件。

节流不起作用,因为我最终会丢失原始序列的元素。

如果原始流比计时器更频繁,则Zip工作良好,但如果存在没有原始事件的时间段则失败。

编辑

作为对Dan的回答的回应,Buffer的问题是,如果许多事件在短时间间隔内爆发,那么我接收事件的频率就太高了。下面显示了缓冲区大小为3时可能发生的情况,并将超时配置为所需的时间间隔:

Raw:       -ABC-DEF-----------G-H-------------------------------
Interval:  o--------o--------o--------o--------o--------o--------
Buffered:  ---A---D-------------------G--------------------------
              B   E                   H
              C   F
Desired:   ---------A--------B--------C--------D--------E ..etc.

平滑Rx观测值

这个怎么样?(受James在评论中提到的答案的启发)…

public static IObservable<T> Regulate<T>(this IObservable<T> source, TimeSpan period)
{
    var interval = Observable.Interval(period).Publish().RefCount();
    return source.Select(x => Observable.Return(x)
                                        .CombineLatest(interval, (v, _) => v)
                                        .Take(1))
                 .Concat();
}

它将原始可观察对象中的每个值转换为它自己的可观察对象。CombineLatest意味着它不会产生值,直到间隔产生值。然后我们只需要从每个可观察对象中取一个值并连接起来。

原始可观察对象中的第一个值延迟一个周期。

看起来你想使用的是Buffer。其中一个重载允许您指定间隔以及缓冲区长度。可以将长度设置为1。

Raw.Buffer(interval, 1);

有关其使用的更多示例,您可以参考IntroToRX站点。