使用 RX 查询,如何获取哪些记录在每秒 3 秒的窗口中具有相同的状态
本文关键字:窗口 状态 记录 何获取 查询 RX 获取 使用 | 更新日期: 2023-09-27 18:26:02
我有几天时间看RX,我读了很多书;我读过介绍Rx;我也看过 101 个 RX 样本,还有很多其他地方,但我无法弄清楚这一点。这听起来很简单,但我无法得到我需要的东西:我需要知道哪个"ID"在状态"已开始"中"卡"至少 30 分钟。
我有一个看起来像这样的类MyInfo:
public class MyInfo
{
public string ID { get; set; }
public string Status { get; set; }
}
我已经编写了一个主题来帮助我像这样测试:
var subject = new Subject<MyInfo>();
subject.OnNext(new MyInfo() { ID = "1", Status = "STARTED" });
subject.OnNext(new MyInfo() { ID = "2", Status = "PHASE1" });
subject.OnNext(new MyInfo() { ID = "3", Status = "STOPPED" });
subject.OnNext(new MyInfo() { ID = "4", Status = "STARTED" });
subject.OnNext(new MyInfo() { ID = "1", Status = "STARTED" });
subject.OnNext(new MyInfo() { ID = "2", Status = "PHASE1" });
subject.OnNext(new MyInfo() { ID = "3", Status = "STOPPED" });
subject.OnNext(new MyInfo() { ID = "4", Status = "PHASE2" });
subject.OnNext(new MyInfo() { ID = "1", Status = "STARTED" });
subject.OnNext(new MyInfo() { ID = "2", Status = "STOPPED" });
subject.OnNext(new MyInfo() { ID = "3", Status = "STOPPED" });
subject.OnNext(new MyInfo() { ID = "4", Status = "STARTED" });
subject.OnCompleted();
到目前为止,我的查询和订阅如下所示:(我在示例中使用了秒(
var q8 = from e in subject
group e by new { ID = e.ID, Status = e.Status } into g
from w in g.Buffer(timeSpan: TimeSpan.FromSeconds(3)
, timeShift: TimeSpan.FromSeconds(1))
select new
{
ID = g.Key.ID,
Status = g.Key.Status,
count = w.Count
};
var subsc = q8.Subscribe(a => Console.WriteLine("{0} {1} {2}", a.ID, a.Status, a.count));
现在,我可以得到一个输出,告诉我 ID 以及该 ID 在一段时间内看到的状态。
ID 状态计数1 开始 32 阶段1 23 停止 34 开始 24 阶段2 12 停止 1
我接下来要做的是,首先,丢弃那些在间隔内看到超过 1 个状态的状态(因此 ID 2 和 4 将被消除(,在剩下的那些中,丢弃那些状态不是"已启动"的(这将消除 ID 3(。ID 1 是我要找的记录。
这是解决问题的最佳方法吗?我如何实现该查询?
另外,我怎样才能让我的主题以不同的间隔发送消息,以便我可以测试窗口。
谢谢!
解决方案的实现
我解决这个问题的方法是生成一个扩展方法,该方法接受IObservable<MyInfo>
输入流(可以是Subject
(和IScheduler
,并返回已卡住的项目流。它看起来像这样:
public static class ObservableExtensions
{
public static IObservable<MyInfo> StuckInfos(this IObservable<MyInfo> source,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return source.Publish(pub =>
pub.Where(x => x.Status == "STARTED")
.SelectMany(
x => Observable.Return(x)
.Delay(TimeSpan.FromMinutes(30), scheduler)
.TakeUntil(pub.Where(y => y.Id == x.Id
&& y.Status != "STARTED"))));
}
}
这里有很多Rx!让我们一点一点地来...
一般的想法是,我们希望查找处于"STARTED"状态的MyInfo
实例(以下简称"项目"(,这些实例被具有匹配 ID 的非启动项目"未应答"30 分钟。
暂时忽略Publish
位,我会回到那个。想象一下,pub
变量是source
。
步骤 1 - 筛选"已启动"项
pub.Where(x => x.Status == "STARTED")
这一点很容易,我们只是过滤了源以获得"已启动"项。
第 2 步 - 将每个项目转换为自己的延迟流
这有点棘手。从一件物品出现的那一刻起,我们就知道 30 分钟后我们想回答这个问题,"另一个物品出现来解开它了吗?为了帮助我们做到这一点,我们将创建一个新的流,该流将在 30 分钟后仅发出信息本身。我们的计划是,如果符合条件的项目出现以解开它,则缩短此流。假设x
是信息,那么我们这样做:
Observable.Return(x)
.Delay(TimeSpan.FromMinutes(30), scheduler)
Observable.Return
将项目转换为可观察的流,该流立即为该项目发出OnNext
,然后OnComplete
s。尽管看起来有点毫无价值,但它实际上是一个非常有用的构建块(对于一些高级阅读,这有时被称为单元函数,IObservable
monad 结构的关键部分(,因为它使我们能够从任何项目创建新的可观察流。一旦我们有了这个流,我们就会Delay
它,以便该项目将在 30 分钟后出现。
请注意我们在调用Delay
时如何指定调度程序 - 这将有助于使用模拟时间进行测试。
步骤 3 - 如果项目"未卡住",则放弃延迟流
现在,如果一个让我们"解卡"的合格项目在 30 分钟结束之前到达,我们将不再对这个项目感兴趣。
"取消粘附"项目的资格标准是与 ID 匹配且没有"已开始"状态的标准 - 我假设该项目的另一个"已开始"副本不足以解开卡住的项目!(如果这是错误的,那么任何状态都有资格取消粘连(。如果一个取消粘附的项目到达原始的未延迟流 ( pub
(,我们使用TakeUntil
在延迟的项目有机会出现之前终止延迟流:
.TakeUntil(pub.Where(y => y.Id == x.Id
&& y.Status != "STARTED"))));
步骤4 - 整理所有这些项目流
现在,我们有点混乱,因为我们将每个项目都投影到它自己的流中 - 我们有一个流流,我们需要以某种方式返回到单个流。为此,我们使用SelectMany
,(高级阅读:相当于 monad 绑定(。SelectMany
在这里为我们做了两项工作 - 它允许我们将项目映射到流中,并将生成的流流一次性平展回单个流。我们将使用的映射函数是我们刚刚构建的函数 - 所以到目前为止,将它们放在一起,我们有:
pub.Where(x => x.Status == "STARTED")
.SelectMany(
x => Observable.Return(x)
.Delay(TimeSpan.FromMinutes(30), scheduler)
.TakeUntil(pub.Where(y => y.Id == x.Id
&& y.Status != "STARTED"))));
步骤 5 - 通过使用Publish
确保我们不会弄乱源流
我们看起来不错,但上面左边有一个微妙的问题需要解决。您会注意到我们订阅了源(pub
(流不止一个 - 在初始Where
过滤器和TakeUntil
中。
这样做的问题是,多次订阅同一流可能会产生意想不到的后果。有些流是"冷"的 - 每个订阅者都开始自己的事件链。在时间是关键因素的查询中,这可能特别棘手。也可能有其他问题 - 但我不想在这里太深入。基本上,我们需要非常小心,我们只订阅一次源流。Publish()
方法可以为我们做到这一点 - 它将订阅一次源,然后将源多播给许多订阅者。
因此,lambda 中出现的pub
是我们可以多次安全订阅的源的"安全"副本。
如何解决测试
您需要为此控制时间 - 最好的选择是在 nuget 包rx-testing
中使用专门构建的 Rx 测试设施。有了这个,您可以使用TestScheduler
来控制时间和安排测试事件。
这是一个简单的测试,可以检测一个简单的卡住的项目。
public class StuckDetectorTests : ReactiveTest
{
[Test]
public void FindSingleStuckItem()
{
var testScheduler = new TestScheduler();
var xs = testScheduler.CreateColdObservable(
OnNext(TimeSpan.FromMinutes(5).Ticks, MyInfo.Started("1")));
var results = testScheduler.CreateObserver<MyInfo>();
xs.StuckInfos(testScheduler).Subscribe(results);
testScheduler.Start();
results.Messages.AssertEqual(
OnNext(TimeSpan.FromMinutes(35).Ticks, MyInfo.Started("1")));
}
}
确保从ReactiveTest
派生测试类,以利用OnXXX
帮助程序方法。
我还在MyInfo
上创建了一些有用的工厂方法,并实现了相等重载以使测试更容易。
完整的代码相当冗长 - 我发布了一个要点,在这里进行了更多测试:https://gist.github.com/james-world/62dca2fe2f91531a0401
这里还有一篇关于测试 Rx 的好博客文章。