如何对流进行分区 (GroupBy) 并监视 Rx 中元素在某些时间段内的缺失情况
本文关键字:元素 时间段 情况 监视 对流 何对流 分区 GroupBy Rx | 更新日期: 2023-09-27 18:34:57
前几天我一直在尝试编写一个 Rx 查询来处理来自源的事件流并检查是否存在一些 ID。缺席的定义是,有一系列的时间窗口(例如,从 9:00 到 17:00 的所有日子(,在此期间,流中最多应该有 20 分钟没有 ID。更复杂的是,应按ID定义缺勤时间。例如,假设三种事件 A、B 和 C 出现在组合事件流中(A、A、B、C、A 、C、B 等(,则可以定义
- 每天从 9:00 到 10:00 监控事件,最多缺席事件 10 分钟。
- 每天从 9:00 到 11:00 监控 B 事件,最多缺席事件 5 分钟。
- 每天 12:00 至 15:00 监控 C 事件,最长缺席事件时间为 30 分钟。
我想我需要首先将流分区成按 GroupBy 分隔事件,然后使用缺席规则处理生成的单独流。我已经在 Rx 论坛上考虑了Microsoft(非常感谢 Dave(,并且我有一些工作代码来生成规则并进行缺勤检查,但我正在努力,例如,如何将其与分组相结合。
因此,在没有进一步演讲的情况下,到目前为止被黑客入侵的代码:
//Some sample data bits representing the events.
public class FakeData
{
public int Id { get; set; }
public string SomeData { get; set; }
}
//Note the Now part in DateTime to zero the clock time and have only the date. The purpose is to create start-end pairs of times, e.g. 9:00-17:00.
//The alarm start and end time points should match themselves pairwise, could be pairs of values...
var maxDate = DateTime.Now.Date.AddHours(17).AddMinutes(0).AddSeconds(0).AddDays(14);
var startDate = DateTime.Now.Date.AddHours(9).AddMinutes(0).AddSeconds(0);
var alarmStartPeriods = Enumerable.Range(0, 1 + (maxDate - startDate).Days).Select(d => new DateTimeOffset(startDate.AddDays(d))).ToList();
var alarmEndPeriods = Enumerable.Range(0, 1 + (maxDate - startDate).Days).Select(d => new DateTimeOffset(startDate.AddDays(d)).AddHours(5)).ToList();
还有一个查询来做缺勤检查而不对它们进行分组,这是我的症结之一。<编辑:>也许我应该将时间点分组成对并添加一个 ID 并在查询中使用生成的三元组......
dataSource = from n in Observable.Interval(TimeSpan.FromMilliseconds(100))
select new FakeData
{
Id = new Random().Next(1, 5),
SomeData = DateTimeOffset.Now.ToString()
};
var startPointOfTimeChanges = alarmStartPeriods.ToObservable();
var endPointOfTimeChanges = alarmEndPeriods.ToObservable();
var durations = startPointOfTimeChanges.CombineLatest(endPointOfTimeChanges, (start, end) => new { start, end });
var maximumInactivityTimeBeforeAlarmSignal = TimeSpan.FromMilliseconds(250);
timer = (from duration in durations
select (from _ in Observable.Timer(DateTime.Now)
from x in dataSource.Throttle(maximumInactivityTimeBeforeAlarmSignal).TakeUntil(duration.end)
select x)).Switch();
timer.Subscribe(x => Debug.WriteLine(x.SomeData));
问题:
- 我应该如何尝试按 ID 对传入数据进行分组,并且仍然能够定义事件的缺失?
- 我注意到的一件事是,如果警报期的起点是过去(例如,查询在 10:00 开始,当规则说在 9:00 开始监控时(,查询将不会开始。我想,开始时间应该推到现在的时间。是否有一些标准方法可以做到这一点,或者我应该只引入一个条件?
我能想到的其他问题会很好(:)自娱自乐(:
- 如何点按每个 ID 发生的最新事件?
- 如何动态更改变量(正如 Dave 在 MS 论坛中已经提到的那样(?
- 然后,最后,批处理事件并存储在某个地方(例如数据库(,就像PeteGoo博客中的这个奇妙示例一样?
我能想到的其他选择是显式使用
System.Threading.Timers
和ConcurrentDictionary
,但需要不断学习!关于詹姆斯的输入答案,这里有一个快速解释它是如何工作的以及我打算如何使用它。
首先,在第一个事件出现之前,可观察量将不做任何事情。因此,如果监视应立即开始,则需要添加其他一些 Rx 功能或触发虚拟事件。我相信不是问题。
其次,将从任何新 ID 的 alarmInterval 中获取一个新的超时变量。在这里,即使是一个已经缺席太久并触发警报的人。
我认为这很有效,因为人们可以订阅这个可观察的内容并做一些有副作用的事情。一些示例包括设置标志、发送信号以及哪些业务规则具有标志。此外,保持适当的锁定等,应该很容易根据预定义的警报规则提供新的时间跨度,并具有单独的缺席时间和时间窗口。
我将不得不研究与此相关的其他概念,以便更好地掌握事物。但我的主要担忧是满足了这一点。生活很好,很美好。:-)
已编辑 - 改进了代码,简化了使用TakeLast
SelectMany
。
我写了一篇关于检测断开连接的客户端的博客文章 - 如果您将帖子中的 timeToHold 变量替换为下面的 alarmInterval 之类的函数以获取基于客户端 ID 的限制时间跨度,这将同样适用于您的场景。
例如:
// idStream is an IObservable<int> of the input stream of IDs
// alarmInterval is a Func<int, TimeSpan> that gets the interval given the ID
var idAlarmStream = idStream
.GroupByUntil(key => key, grp => grp.Throttle(alarmInterval(grp.Key)))
.SelectMany(grp => grp.TakeLast(1));
这为您提供了持续监控的基本功能,而无需查看活动监控周期。
为了获得监视器窗口功能,我会扭转局面并使用 WHERE 过滤上述输出,该输出检查发出的 ID 是否落在其监视时间窗口内。这样可以更轻松地处理不断变化的监视周期。
您可以通过将每个监视窗口转换为流并将其与警报流组合来做一些更花哨的事情,但我不相信额外复杂性的好处。
alarmInterval 函数还将为您提供动态警报间隔元素,因为它可以返回新值,但这些值仅在该 ID 的警报响起从而结束其当前组后生效。
--- 在这里进行一些理论研究---
要获得这种完全动态,您必须以某种方式结束该组 - 您可以通过几种方式做到这一点。
一种是使用 Select 将 idStream 投影到包含 ID 和全局计数器值的自定义类型的流中。为此类型提供适当的相等实现,以便它可以与 GroupByTill 正确配合使用。
现在,每次更改警报间隔时,请更改计数器。这将导致为每个 ID 创建新组。然后,可以在最终筛选器中添加其他检查,以确保输出事件具有最新的计数器值。