如何在复杂的分组设置中使用 Rx 限制限制持续时间选择器

本文关键字:Rx 选择器 持续时间 设置 复杂 | 更新日期: 2023-09-27 18:37:05

所以,我问如何在运行查询中更改Throttle时间跨度,詹姆斯随后回答说存在重载,实际上也提供了一个示例(一切都很好,我也从那里学到了一些技术)。

在上一个周末,我制作了一段代码,其中Throttle间隔将由传入流本身定义。作为一个实际示例,流可以是一系列结构,定义如下

struct SomeEvent
{
    public int Id;
    public DateTimeOffset TimeStamp;
}

然后接受流将检查TimeStamp字段并根据它们计算缺勤间隔。改变一点詹姆斯的链接示例,流可以像

Func<SomeEvent, IObservable<long>> throttleFactory = e => Observable.Timer(TimeSpan.FromTicks(throttleDuration.Ticks - (DateTimeOffset.Now.Ticks - e.TimeStamp.Ticks)));
var sequence = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => new SomeEvent { Id = 0, TimeStamp = DateTimeOffset.Now.AddTicks(-1) }).Throttle(throttleFactory);
var subscription = sequence.Subscribe(e => Console.WriteLine(e.TimeStamp));

时间偏移,几个刻度,仅用于说明目的

然后我这里有一个更详细的例子,再次,詹姆斯帮助了很多。简而言之,这里的想法是,每个ID可以有"警报灯塔"(类似于交通信号灯),具有黄色和红色等颜色,它们在轮流点亮时由没有事件的时间定义。然后,当事件到达时,所有灯都关闭,"缺席计时器"再次从零开始。

我遇到的障碍是我似乎无法更改这个特定示例,以便它使用这个想法来产生Throttle值。特别是我似乎无法在詹姆斯的代码中在线grp => grp.Throttle(thresholdSelector(grp.Key, level), scheduler))很好地进行分组。也许我对调试和所有内容都太累了,但如果有人能为正确的方向提供推动,我肯定会不胜感激!

有什么大创意?好吧,事件可以在源头加盖时间戳,但传输可能会增加需要考虑的延迟。从与分布式计算相关的 F# 用户组讨论来看(我自己也对集成问题有些熟悉),事件在某处加盖时间戳,然后通过不同的队列系统中继的场景会产生两种情况:

  1. 技术超时:在某些事件中未观察到任何事件在定义的时间间隔内终结点。
  2. 业务超时:可能存在是大量的事件,例如暂时的、持续的突发事件(甚至是重复的)来自排队系统,但它们带有时间戳"太久远了"。

<编辑:>布兰登对我在2中给出的例子提出了一个有效的观点。实际上应该如何解释"业务超时"的缺失?如果事件尚未到达,则生成的唯一有效超时事件是 1 中的"技术"超时事件。如果它们确实以突发形式到达,接收者是否对事件之间的时间差感兴趣,并希望相应地提高彩色事件?或者,如果计时器只是根据业务事件中的时间戳重置,然后在突发到达时,获取最后一个时间戳(同样,可能比允许的超时期限长)。它变得复杂和混乱,最好把这个作为一个例子。

写了这么多,我仍然有兴趣知道如何在grp => grp.Throttle(thresholdSelector(grp.Key, level), scheduler))中执行连接。如果这变得复杂,我也倾向于将布兰登的帖子标记为答案(我觉得可能会得到,我觉得分组相当复杂)。

如何在复杂的分组设置中使用 Rx 限制限制持续时间选择器

听起来限制

不再是您想要的。 这是你想做的吗?

var alarms = events
    .GroupBy(e => e.Id)
    .SelectMany(grp =>
    {
        // Determine light color based on delay between events
        // go black if event arrives that is not stale
        var black = grp
            .Where(ev => (Date.Now - ev.TimeStamp) < TimeSpan.FromSeconds(2))
            .Select(ev => "black");
        // go yellow if no events after 1 second
        var yellow = black
            .Select(b => Observable.Timer(TimeSpan.FromSeconds(1)))
            .SwitchLatest()
            .Select(t => "yellow");
        // go red if no events after 2 seconds
        var red = black
            .Select(b => Observable.Timer(TimeSpan.FromSeconds(2)))
            .SwitchLatest()
            .Select(t => "red");
        return Observable
            .Merge(black, yellow, red)
            .Select(color => new { Id = grp.Key, Color = color });
    });