反应式的“缓冲直到安静”行为
本文关键字:安静 行为 缓冲 反应式 | 更新日期: 2023-09-27 18:35:45
我的问题有点像Nagle算法是为了解决的问题,但不完全是。我想要的是将来自IObservable<T>
的OnNext
通知缓冲成一系列IObservable<IList<T>>
,如下所示:
- 当第一个
T
通知到达时,将其添加到缓冲区并开始倒计时 - 如果在倒计时到期之前到达另一个
T
通知,请将其添加到缓冲区并重新启动倒计时 - 倒计时到期后(即生产者已经静默了一段时间),将所有缓冲的
T
通知作为单个聚合IList<T>
通知转发。 - 如果在倒计时到期之前缓冲区大小超过某个最大值,请仍然发送它。
IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler)
看起来很有希望,但它似乎会定期发送聚合通知,而不是执行我想要的"在第一个通知到达时启动计时器并在其他通知到达时重新启动它"的行为,并且如果没有从下面生成通知,它还在每个时间窗口结束时发送一个空列表。
我不想删除任何T
通知;只需缓冲它们。
是否存在这样的东西,或者我需要自己编写?
SO 上存在一些类似的问题,但不完全是这样。这是一个可以解决问题的扩展方法。
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
(this IObservable<TSource> source,
int maxAmount, TimeSpan threshold)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
有趣的运算符。Supertopi的答案很好,但可以做出改进。如果maxAmount
很大,和/或通知率很高,则使用 Buffer
将通过分配不久后被丢弃的缓冲区来烧毁 GC。
为了在达到maxAmount
后关闭每个GroupBy
可观察对象,您无需捕获所有这些元素的Buffer
,只需知道它何时已满。根据Supertopi的回答,您可以将其稍微更改为以下内容。它不会收集maxAmount
元素的Buffer
,而只是在流中看到maxAmount
元素后发出信号。
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source, int maxAmount, TimeSpan threshold)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge(g.Take(maxAmount)
.LastAsync()
.Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}
不错的解决方案。在我看来,使用现有运算符创建行为只是为了方便,而不是为了性能。
此外,我们应该始终返回 IEnumerable 而不是 IList。返回派生最少的类型 (IEnumerable) 将为你留下最大的余地来更改基础实现。
这是我实现自定义运算符的版本。
public static IObservable<IEnumerable<TValue>> BufferWithThrottle<TValue>(this IObservable<TValue> @this, int maxAmount, TimeSpan threshold)
{
var buffer = new List<TValue>();
return Observable.Create<IEnumerable<TValue>>(observer =>
{
var aTimer = new Timer();
void Clear()
{
aTimer.Stop();
buffer.Clear();
}
void OnNext()
{
observer.OnNext(buffer);
Clear();
}
aTimer.Interval = threshold.TotalMilliseconds;
aTimer.Enabled = true;
aTimer.Elapsed += (sender, args) => OnNext();
var subscription = @this.Subscribe(value =>
{
buffer.Add(value);
if (buffer.Count >= maxAmount)
OnNext();
else
{
aTimer.Stop();
aTimer.Start();
}
});
return Disposable.Create(() =>
{
Clear();
subscription.Dispose();
});
});
}
通过测试与其他解决方案相比的性能,它可以节省高达 30% 的 CPU 功率并解决内存问题。