反应性扩展:事件的频率
本文关键字:事件 频率 扩展 | 更新日期: 2023-09-27 18:05:31
假设我有一个类MyClass {string id, string eventType, datetime ts}
ts是事件的时间戳,Id是我要计算的频率
我有一个MyClass的热可观察对象,我想计算在过去30秒内每个stringId接收的事件数
如果事件数超过5,我引发另一个MyClass事件(具有相同的Id, eventType ="New"),如果它再次下降到3以下,我需要更新先前引发的事件(具有相同的Id, eventType ="New")。
我想我需要使用滑动窗口,我已经走了这么远
public static IObservable<MyClass> CountFrequency(this IObservable<MyClass> source, TimeSpan withinPeriod, string marker)
{
// var scheduler = new HistoricalScheduler();
// var driveSchedule = source.Subscribe(e => scheduler.AdvanceTo(e.Timestamp));
return source.Window(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(5))
.SelectMany(sl => sl)
.GroupBy(a => a.id)
.SelectMany(go => go
.Aggregate(new MyClass(), (acc, evt) => CustomAggFrequency(acc, evt, marker))
.Select(count => count)));
}
我不能理解
a)如何将调度器与数据的时间戳而不是系统时间联系起来b)如何编写CustomAggFrequency的逻辑
有什么建议
这样不行吗?你的问题用这样的措词很难弄清楚你想要什么。
var span = TimeSpan.FromSeconds(30);
var shift = TimeSpan.FromSeconds(5);
var query = source
.Window(span, shift)
.Select(window => window
.GroupBy(item => item.id)
.ToDictionary(g => x.Key, g => g.Count() / span.TotalSeconds));
这个query
将每5秒发出一个字典,在过去30秒内将每个id
映射到其以赫兹为单位的频率。类型为IObservable<Dictionary<string, double>>