Rx 缓冲区,不对订阅者进行空调用

本文关键字:调用 缓冲区 Rx | 更新日期: 2023-09-27 18:36:05

在使用.Net 4.6的WPF应用程序中,我有一个事件以很高的速率(每秒几百个)触发新数据点,但并非一直如此。此数据显示在图表中。

我想每 50 毫秒更新一次图表,而不是在每个新数据点之后更新一次。
为了实现这一目标,我使用 Rx 的 Buffer(TimeSpan.FromMilliseconds(50)),理论上效果很好。但是,如果没有创建新的数据点,我的订阅者也会每 50 毫秒调用一次,这不是我想要的。

我创建了一个小示例应用程序来测试它:

using System;
using System.Reactive.Linq;
namespace RxTester
{
    public class Program
    {
        private static event EventHandler TheEvent;
        static void Main(string[] args)
        {
            var observable = Observable.FromEvent<EventHandler, EventArgs>(h => (s, e) => h(e), h => TheEvent += h, h => TheEvent -= h);
            var subscriber = observable.Buffer(TimeSpan.FromMilliseconds(1000))
                .Subscribe(e => Console.WriteLine($"{DateTime.Now.ToLongTimeString()}: {e.Count} elements received..."));
            var random = new Random();
            var timer = new System.Timers.Timer(2000)
                {
                    AutoReset = true,
                    Enabled = true
                };
            timer.Elapsed += (s, e) =>
                {
                    var amount = random.Next(1, 10);
                    for (int i = 0; i < amount; ++i)
                        TheEvent?.Invoke(null, null);
                };
            Console.ReadLine();
            timer.Enabled = false;
            subscriber.Dispose();
        }
    }
}

您需要添加"Rx-Linq"NuGet 包才能运行或使用以下 Fiddle:https://dotnetfiddle.net/TV5tD4

在那里,您会看到几个"收到 0 个元素",这就是我想避免的。我知道我可以简单地检查e.Count == 0,但是由于我使用了多个这样的缓冲区,这对我来说似乎不是最佳的。

有没有办法仅在元素可用时才创建新的元素缓冲块?
我也愿意采用其他方法来解决按时间批处理事件的问题 - 我已经研究了 TPL 数据流BatchBlock,但这似乎只支持基于计数的块大小。

Rx 缓冲区,不对订阅者进行空调用

我们可以再次使用强大的GroupByUntil方法来创建此扩展

public static IObservable<IList<TSource>> BufferWhenAvailable<TSource>
                                          (this IObservable<TSource> source, 
                                           TimeSpan threshold)
{
    return source.Publish( sp => 
                    sp.GroupByUntil(_ => true, _ => Observable.Timer(threshold))
                      .SelectMany(i => i.ToList()));
}

这样做的标准方法是简单地

.Buffer(period)
.Where(buffer=>buffer.Any())

所以有效地做你想避免的事情(count==0).但是,这张支票非常便宜,我会想象是否比所涉及的其他费用便宜得多,即调度。唯一担心的可能是正在发生的数量分配(每 50 毫秒创建一个 List<T> ),然后是可能建立的即将到来的 GC Gen0 压力。