反应式的“缓冲直到安静”行为

本文关键字:安静 行为 缓冲 反应式 | 更新日期: 2023-09-27 18:35:45

我的问题有点像Nagle算法是为了解决的问题,但不完全是。我想要的是将来自IObservable<T>OnNext通知缓冲成一系列IObservable<IList<T>>,如下所示:

  1. 当第一个T通知到达时,将其添加到缓冲区并开始倒计时
  2. 如果在倒计时到期之前到达另一个T通知,请将其添加到缓冲区并重新启动倒计时
  3. 倒计时到期后(即生产者已经静默了一段时间),将所有缓冲的T通知作为单个聚合IList<T>通知转发。
  4. 如果在倒计时到期之前缓冲区大小超过某个最大值,请仍然发送它。

IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler)看起来很有希望,但它似乎会定期发送聚合通知,而不是执行我想要的"在第一个通知到达时启动计时器并在其他通知到达时重新启动它"的行为,并且如果没有从下面生成通知,它还在每个时间窗口结束时发送一个空列表。

不想删除任何T通知;只需缓冲它们。

是否存在这样的东西,或者我需要自己编写?

反应式的“缓冲直到安静”行为

SO 上存在一些类似的问题,但不完全是这样。这是一个可以解决问题的扩展方法。

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
                                          (this IObservable<TSource> source,
                                           int maxAmount, TimeSpan threshold)
{
    return Observable.Create<IList<TSource>>((obs) =>
    {
        return source.GroupByUntil(_ => true,
                                   g => g.Throttle(threshold).Select(_ => Unit.Default)
                                         .Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
                     .SelectMany(i => i.ToList())
                     .Subscribe(obs);
    });
}

有趣的运算符。Supertopi的答案很好,但可以做出改进。如果maxAmount很大,和/或通知率很高,则使用 Buffer 将通过分配不久后被丢弃的缓冲区来烧毁 GC。

为了在达到maxAmount后关闭每个GroupBy可观察对象,您无需捕获所有这些元素的Buffer,只需知道它何时已满。根据Supertopi的回答,您可以将其稍微更改为以下内容。它不会收集maxAmount元素的Buffer,而只是在流中看到maxAmount元素后发出信号。

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source, int maxAmount, TimeSpan threshold)
{
    return Observable.Create<IList<TSource>>((obs) =>
    {
        return source.GroupByUntil(_ => true,
                                   g => g.Throttle(threshold).Select(_ => Unit.Default)
                                         .Merge(g.Take(maxAmount)
                                                 .LastAsync()
                                                 .Select(_ => Unit.Default)))
                     .SelectMany(i => i.ToList())
                     .Subscribe(obs);
    });
}

不错的解决方案。在我看来,使用现有运算符创建行为只是为了方便,而不是为了性能。

此外,我们应该始终返回 IEnumerable 而不是 IList。返回派生最少的类型 (IEnumerable) 将为你留下最大的余地来更改基础实现。

这是我实现自定义运算符的版本。

public static IObservable<IEnumerable<TValue>> BufferWithThrottle<TValue>(this IObservable<TValue> @this, int maxAmount, TimeSpan threshold)
    {
        var buffer = new List<TValue>();
        return Observable.Create<IEnumerable<TValue>>(observer =>
        {
            var aTimer = new Timer();
            void Clear()
            {
                aTimer.Stop();
                buffer.Clear();
            }
            void OnNext()
            {
                observer.OnNext(buffer);
                Clear();
            }
            aTimer.Interval = threshold.TotalMilliseconds;
            aTimer.Enabled = true;
            aTimer.Elapsed += (sender, args) => OnNext();
            var subscription = @this.Subscribe(value =>
            {
                buffer.Add(value);
                if (buffer.Count >= maxAmount)
                    OnNext();
                else
                {
                    aTimer.Stop();
                    aTimer.Start();
                }
            });
            return Disposable.Create(() =>
            {
                Clear();
                subscription.Dispose();
            });
        });
    }

通过测试与其他解决方案相比的性能,它可以节省高达 30% 的 CPU 功率并解决内存问题。