响应式扩展是否支持滚动缓冲区?
本文关键字:滚动 缓冲区 支持 是否 扩展 响应 | 更新日期: 2023-09-27 18:09:58
我正在使用响应式扩展将数据整理到100ms的缓冲区中:
this.subscription = this.dataService
.Where(x => !string.Equals("FOO", x.Key.Source))
.Buffer(TimeSpan.FromMilliseconds(100))
.ObserveOn(this.dispatcherService)
.Where(x => x.Count != 0)
.Subscribe(this.OnBufferReceived);
这很好。但是,我想要与Buffer
操作提供的行为略有不同。实际上,如果接收到另一个数据项,我想重置计时器。只有在整个100ms内没有接收到任何数据时,我才想处理它。这就提供了永远不处理数据的可能性,所以我也应该能够指定最大计数。我会想象类似这样的内容:
.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)
我已经环顾四周,并没有能够找到这样的东西在Rx?有人能证实/否认这一点吗?
通过结合Observable
的内置Window
和Throttle
方法来实现。首先,让我们解决忽略最大计数条件的简单问题:
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
var closes = stream.Throttle(delay);
return stream.Window(() => closes).SelectMany(window => window.ToList());
}
功能强大的Window
方法完成了繁重的工作。现在很容易看到如何添加最大计数:
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null)
{
var closes = stream.Throttle(delay);
if (max != null)
{
var overflows = stream.Where((x,index) => index+1>=max);
closes = closes.Merge(overflows);
}
return stream.Window(() => closes).SelectMany(window => window.ToList());
}
我将在我的博客上写一篇文章来解释这一点。https://gist.github.com/2244036Window方法的文档:
- http://leecampbell.blogspot.co.uk/2011/03/rx-part-9join-window-buffer-and-group.html
- http://enumeratethis.com/2011/07/26/financial-charts-reactive-extensions/
我写了一个扩展来做大多数你之后- BufferWithInactivity
。
在这里:
public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
this IObservable<T> source,
TimeSpan inactivity,
int maximumBufferSize)
{
return Observable.Create<IEnumerable<T>>(o =>
{
var gate = new object();
var buffer = new List<T>();
var mutable = new SerialDisposable();
var subscription = (IDisposable)null;
var scheduler = Scheduler.ThreadPool;
Action dump = () =>
{
var bts = buffer.ToArray();
buffer = new List<T>();
if (o != null)
{
o.OnNext(bts);
}
};
Action dispose = () =>
{
if (subscription != null)
{
subscription.Dispose();
}
mutable.Dispose();
};
Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted =
onAction =>
{
lock (gate)
{
dispose();
dump();
if (o != null)
{
onAction(o);
}
}
};
Action<Exception> onError = ex =>
onErrorOrCompleted(x => x.OnError(ex));
Action onCompleted = () => onErrorOrCompleted(x => x.OnCompleted());
Action<T> onNext = t =>
{
lock (gate)
{
buffer.Add(t);
if (buffer.Count == maximumBufferSize)
{
dump();
mutable.Disposable = Disposable.Empty;
}
else
{
mutable.Disposable = scheduler.Schedule(inactivity, () =>
{
lock (gate)
{
dump();
}
});
}
}
};
subscription =
source
.ObserveOn(scheduler)
.Subscribe(onNext, onError, onCompleted);
return () =>
{
lock (gate)
{
o = null;
dispose();
}
};
});
}
使用Rx Extensions 2.0,您可以通过接受超时和大小的新缓冲区过载来满足这两个需求:
this.subscription = this.dataService
.Where(x => !string.Equals("FOO", x.Key.Source))
.Buffer(TimeSpan.FromMilliseconds(100), 1)
.ObserveOn(this.dispatcherService)
.Where(x => x.Count != 0)
.Subscribe(this.OnBufferReceived);
参见https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx获取文档
正如Rohit Sharma在他对Panic上校的解决方案的评论中提到的那样,存在一个问题,即项目将被缓冲,除非项目生成,否则不会推送给订阅者。
正如这条评论所描述的,问题是p.Window(() => closes)
,因为它打开了一个可能错过事件的间隙。
该lambda将在每个窗口被处理后被调用。Window操作符会在每次lambda返回时调用Subscribe,因为据它所知,你可能每次都从lambda返回一个完全不同的IObservable
因为现在总是使用相同的lambda,我们需要调整maxCount。如果没有更改,maxCount将永远不会被重置,并且在它被击中一次之后,每个新事件都将超过maxCount。
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? maxCount = null)
{
var publish = stream.Publish(p =>
{
var closes = p.Throttle(delay);
if (maxCount != null)
{
Int32 i = 0;
var overflows = p.Where(x =>
{
++i;
if (i >= maxCount)
{
i = 0;
return true;
}
return false;
});
closes = closes.Merge(overflows);
}
return p.Window(closes).SelectMany(window => window.ToList());
});
return publish;
}
更新:
经过进一步的测试,我发现在某些情况下,项目仍然不能正确地推送到订阅者。
这是我们已经工作了4个月没有任何问题的解决方案。
解决方法是将.Delay(...)
与任何TimeSpan
一起添加。
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? maxCount = null)
{
var publish = stream.Publish(p =>
{
var closes = p.Throttle(delay);
if (maxCount != null)
{
var overflows = stream.Where((x, index) => index + 1 >= maxCount);
closes = closes.Merge(overflows);
}
return p.Window(() => closes).SelectMany(window => window.ToList()).Delay(TimeSpan.Zero);
});
return publish;
}
我想这可以在Buffer方法的顶部实现,如下所示:
public static IObservable<IList<T>> SlidingBuffer<T>(this IObservable<T> obs, TimeSpan span, int max)
{
return Observable.CreateWithDisposable<IList<T>>(cl =>
{
var acc = new List<T>();
return obs.Buffer(span)
.Subscribe(next =>
{
if (next.Count == 0) //no activity in time span
{
cl.OnNext(acc);
acc.Clear();
}
else
{
acc.AddRange(next);
if (acc.Count >= max) //max items collected
{
cl.OnNext(acc);
acc.Clear();
}
}
}, err => cl.OnError(err), () => { cl.OnNext(acc); cl.OnCompleted(); });
});
}
注意:我还没有测试过,但我希望它能给你一个想法。
Panic上校的解决方案近乎完美。唯一缺少的是Publish
组件,以便使解决方案也适用于冷序列。
/// <summary>
/// Projects each element of an observable sequence into a buffer that's sent out
/// when either a given inactivity timespan has elapsed, or it's full,
/// using the specified scheduler to run timers.
/// </summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
this IObservable<T> source, TimeSpan dueTime, int maxCount,
IScheduler scheduler = default)
{
if (maxCount < 1) throw new ArgumentOutOfRangeException(nameof(maxCount));
scheduler ??= Scheduler.Default;
return source.Publish(published =>
{
var combinedBoundaries = Observable.Merge
(
published.Throttle(dueTime, scheduler),
published.Skip(maxCount - 1)
);
return published
.Window(() => combinedBoundaries)
.SelectMany(window => window.ToList());
});
}
除了添加Publish
之外,我还用等效但更短的.Skip(maxCount - 1)
替换了原始的.Where((_, index) => index + 1 >= maxCount)
。为了完整起见,还有一个IScheduler
参数,用于配置运行计时器的调度程序。