反应性扩展:事件的频率

本文关键字:事件 频率 扩展 | 更新日期: 2023-09-27 18:05:31

假设我有一个类MyClass {string id, string eventType, datetime ts}

ts是事件的时间戳,Id是我要计算的频率

我有一个MyClass的热可观察对象,我想计算在过去30秒内每个stringId接收的事件数

如果事件数超过5,我引发另一个MyClass事件(具有相同的Id, eventType ="New"),如果它再次下降到3以下,我需要更新先前引发的事件(具有相同的Id, eventType ="New")。

我想我需要使用滑动窗口,我已经走了这么远

public static IObservable<MyClass> CountFrequency(this IObservable<MyClass> source, TimeSpan withinPeriod, string marker)
{
    // var scheduler = new HistoricalScheduler();
    //  var driveSchedule = source.Subscribe(e => scheduler.AdvanceTo(e.Timestamp)); 
    return source.Window(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(5))
        .SelectMany(sl => sl)
        .GroupBy(a => a.id)
        .SelectMany(go => go
        .Aggregate(new MyClass(), (acc, evt) => CustomAggFrequency(acc, evt, marker))
        .Select(count => count)));
}

我不能理解

a)如何将调度器与数据的时间戳而不是系统时间联系起来b)如何编写CustomAggFrequency的逻辑

有什么建议

反应性扩展:事件的频率

这样不行吗?你的问题用这样的措词很难弄清楚你想要什么。

var span = TimeSpan.FromSeconds(30);
var shift = TimeSpan.FromSeconds(5);
var query = source
    .Window(span, shift)
    .Select(window => window
        .GroupBy(item => item.id)
        .ToDictionary(g => x.Key, g => g.Count() / span.TotalSeconds));

这个query将每5秒发出一个字典,在过去30秒内将每个id映射到其以赫兹为单位的频率。类型为IObservable<Dictionary<string, double>>