Rx for.Net:如何将扫描与Throttle相结合
本文关键字:扫描 Throttle 相结合 for Net Rx | 更新日期: 2023-09-27 18:29:24
我的问题是:对于给定的事件序列,我希望缓存它们的值,直到流中出现暂停。然后,我将批量处理所有缓存的数据,并清除缓存状态。
一个简单的方法是(不是一个工作代码,可能存在一些错误):
struct FlaggedData
{
public EventData Data { get; set; }
public bool Reset { get; set; }
}
...
IObservable<EventData> eventsStream = GetStream();
var resetSignal = new Subject<FlaggedData>();
var flaggedDataStream = eventsStream
.Select(data => new FlaggedData { Data = data })
.Merge(resetSignal)
.Scan(
new List<EventData>(),
(cache, flaggedData) =>
{
if (!flaggedData.Reset())
{
cache.Add(flaggedData.Data);
return cache;
}
return new List<EventData>();
})
.Throttle(SomePeriodOfTime)
.Subscribe(batch =>
{
resetSignal.OnNext(new FlaggedData { Reset = true});
ProcessBatch(batch);
});
因此,在这里,在收到任何要处理的批后,我请求重置缓存。问题是,由于Throttle
,缓存中可能有一些数据(或者我相信是这样),在这种情况下会丢失。
我想要的是一些操作,比如:
ScanWithThrottling<TAccumulate, TSource>(
IObservable<TSource> source,
Func<TAccumulate, TSource, TAccumulate> aggregate,
TimeSpan throttlingSpan)
其每次调用其订户的CCD_ 2时返回将重置累积值的可观测值。
当然,我可以写一个我自己的扩展,但问题是是否有某种方法可以使用标准的Rx操作实现同样的效果。
我认为这里有一个简单的方法。使用Buffer()
来缓冲基于如下节流阀的元素:
var buffered = source.Publish(ps =>
ps.Buffer(() => ps.Throttle(SomePeriodOfTime)));
这将缓冲元素,直到出现SomePeriodOfTime的间隙,并将它们作为列表显示。无需担心"重置"方面,而且不会丢失元素。
Publish
的使用确保了对Buffer
和每个Throttle
都可以使用的源事件有一个单独的共享订阅。油门是缓冲器关闭功能,提供一个信号,指示应启动新的缓冲器。
这是一个可测试的版本——我只是在这里转储每个缓冲区的长度,并使用Timestamp
添加定时信息,但它是原始缓冲流上的IList<T>
。请注意如何将调度程序作为参数提供给基于时间的操作以启用测试。
注意,您将需要nuget包rx测试来运行此示例,以引入rx测试框架并获得TestScheduler
和ReactiveTest
类型:
void Main()
{
var scenarios = new Scenarios();
scenarios.Scenario1();
}
public class Scenarios : ReactiveTest
{
public void Scenario1()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateHotObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3),
OnNext(800, 4),
OnNext(900, 5),
OnNext(1400, 6),
OnNext(1600, 7),
OnNext(1700, 8),
OnNext(1800, 9));
var duration = TimeSpan.FromTicks(300);
var buffered = source.Publish(ps =>
ps.Buffer(() => ps.Throttle(duration, scheduler)));
buffered.Timestamp(scheduler).Subscribe(
x => Console.WriteLine("Timestamp: {0} Value: {1}",
x.Timestamp.Ticks, x.Value.Count()));
scheduler.Start();
}
}
本质上,您似乎想要一个缓冲而非丢弃的Throttle
。
为了解决这个问题,让我们思考一下Throttle
是如何工作的:
- 当一个值到达时,缓存它(替换以前的任何值)并启动计时器
- 如果另一个值在计时器过期之前到达,请取消计时器并转到#1
- 否则,当计时器超时时,从缓存中推送该值
您的规格:
- 当一个值到达时,将其附加到缓冲区并启动计时器
- 如果另一个值在计时器过期之前到达,请取消计时器并转到#1
- 否则,当计时器超时时,推送缓冲区并创建一个新的缓冲区
因此:
var throttled = source.Publish(p =>
from value in p
// TODO: Add to buffer (side effect)
from _ in Observable.Timer(period, scheduler).TakeUntil(p)
select buffer); // TODO: Create new buffer (side effect + contention)
请注意,如果指定的scheduler
引入了并发性,那么假设的buffer
的竞争条件和可能的争用。
如果p
观察到新的value
时Timer
已经过去,那么在天真的实现中,在推送OnNext
0并创建新buffer
之前,最新的value
可能泄漏到buffer
中。这可能是可以接受的,尽管无论你如何削减同步都是必要的。在我看来,除非你用时间戳扩展通知,或者保证所有操作的线程相关性,否则不可能获得你想要的确切语义。
也许它作为一个好的广义算子是有意义的。对我来说,它似乎足够正交,可以作为Rx基元包括在内,除非我只是在分析中出错。请考虑添加一个工作项。
我还清楚地记得在Rx MSDN论坛上讨论过类似的运营商,可能不止一次。也许值得搜索这些讨论。
编辑:我已经改变了在本例中使用同步的想法,因为无论如何,假设scheduler
引入了并发,竞争条件在这里是不可避免的
未测试示例:
public static IObservable<IList<TSource>> ThrottleBuffered<TSource>(
this IObservable<TSource> source,
TimeSpan period,
IScheduler scheduler)
{
return source.Publish(p =>
{
var buffer = new List<TSource>();
return (from _ in p.Do(buffer.Add)
from __ in Observable.Timer(period, scheduler).TakeUntil(p)
.Do(_ => buffer = new List<TSource>())
select buffer);
});
}