如何在复杂的分组设置中使用 Rx 限制限制持续时间选择器
本文关键字:Rx 选择器 持续时间 设置 复杂 | 更新日期: 2023-09-27 18:37:05
所以,我问如何在运行查询中更改Throttle
时间跨度,詹姆斯随后回答说存在重载,实际上也提供了一个示例(一切都很好,我也从那里学到了一些技术)。
在上一个周末,我制作了一段代码,其中Throttle
间隔将由传入流本身定义。作为一个实际示例,流可以是一系列结构,定义如下
struct SomeEvent
{
public int Id;
public DateTimeOffset TimeStamp;
}
然后接受流将检查TimeStamp
字段并根据它们计算缺勤间隔。改变一点詹姆斯的链接示例,流可以像
Func<SomeEvent, IObservable<long>> throttleFactory = e => Observable.Timer(TimeSpan.FromTicks(throttleDuration.Ticks - (DateTimeOffset.Now.Ticks - e.TimeStamp.Ticks)));
var sequence = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => new SomeEvent { Id = 0, TimeStamp = DateTimeOffset.Now.AddTicks(-1) }).Throttle(throttleFactory);
var subscription = sequence.Subscribe(e => Console.WriteLine(e.TimeStamp));
时间偏移,几个刻度,仅用于说明目的
然后我这里有一个更详细的例子,再次,詹姆斯帮助了很多。简而言之,这里的想法是,每个ID可以有"警报灯塔"(类似于交通信号灯),具有黄色和红色等颜色,它们在轮流点亮时由没有事件的时间定义。然后,当事件到达时,所有灯都关闭,"缺席计时器"再次从零开始。
我遇到的障碍是我似乎无法更改这个特定示例,以便它使用这个想法来产生Throttle
值。特别是我似乎无法在詹姆斯的代码中在线grp => grp.Throttle(thresholdSelector(grp.Key, level), scheduler))
很好地进行分组。也许我对调试和所有内容都太累了,但如果有人能为正确的方向提供推动,我肯定会不胜感激!
有什么大创意?好吧,事件可以在源头加盖时间戳,但传输可能会增加需要考虑的延迟。从与分布式计算相关的 F# 用户组讨论来看(我自己也对集成问题有些熟悉),事件在某处加盖时间戳,然后通过不同的队列系统中继的场景会产生两种情况:
- 技术超时:在某些事件中未观察到任何事件在定义的时间间隔内终结点。
- 业务超时:可能存在是大量的事件,例如暂时的、持续的突发事件(甚至是重复的)来自排队系统,但它们带有时间戳"太久远了"。
<编辑:>布兰登对我在2中给出的例子提出了一个有效的观点。实际上应该如何解释"业务超时"的缺失?如果事件尚未到达,则生成的唯一有效超时事件是 1 中的"技术"超时事件。如果它们确实以突发形式到达,接收者是否对事件之间的时间差感兴趣,并希望相应地提高彩色事件?或者,如果计时器只是根据业务事件中的时间戳重置,然后在突发到达时,获取最后一个时间戳(同样,可能比允许的超时期限长)。它变得复杂和混乱,最好把这个作为一个例子。
写了这么多,我仍然有兴趣知道如何在grp => grp.Throttle(thresholdSelector(grp.Key, level), scheduler))
中执行连接。如果这变得复杂,我也倾向于将布兰登的帖子标记为答案(我觉得可能会得到,我觉得分组相当复杂)。
不再是您想要的。 这是你想做的吗?
var alarms = events
.GroupBy(e => e.Id)
.SelectMany(grp =>
{
// Determine light color based on delay between events
// go black if event arrives that is not stale
var black = grp
.Where(ev => (Date.Now - ev.TimeStamp) < TimeSpan.FromSeconds(2))
.Select(ev => "black");
// go yellow if no events after 1 second
var yellow = black
.Select(b => Observable.Timer(TimeSpan.FromSeconds(1)))
.SwitchLatest()
.Select(t => "yellow");
// go red if no events after 2 seconds
var red = black
.Select(b => Observable.Timer(TimeSpan.FromSeconds(2)))
.SwitchLatest()
.Select(t => "red");
return Observable
.Merge(black, yellow, red)
.Select(color => new { Id = grp.Key, Color = color });
});