使用 RX 查询,如何获取哪些记录在每秒 3 秒的窗口中具有相同的状态

本文关键字:窗口 状态 记录 何获取 查询 RX 获取 使用 | 更新日期: 2023-09-27 18:26:02

我有几天时间看RX,我读了很多书;我读过介绍Rx;我也看过 101 个 RX 样本,还有很多其他地方,但我无法弄清楚这一点。这听起来很简单,但我无法得到我需要的东西:我需要知道哪个"ID"在状态"已开始"中"卡"至少 30 分钟。

我有一个看起来像这样的类MyInfo:

public class MyInfo
{
    public string ID { get; set; }
    public string Status { get; set; }
}

我已经编写了一个主题来帮助我像这样测试:

        var subject = new Subject<MyInfo>();
        subject.OnNext(new MyInfo() { ID = "1", Status = "STARTED" });
        subject.OnNext(new MyInfo() { ID = "2", Status = "PHASE1" });
        subject.OnNext(new MyInfo() { ID = "3", Status = "STOPPED" });
        subject.OnNext(new MyInfo() { ID = "4", Status = "STARTED" });
        subject.OnNext(new MyInfo() { ID = "1", Status = "STARTED" });
        subject.OnNext(new MyInfo() { ID = "2", Status = "PHASE1" });
        subject.OnNext(new MyInfo() { ID = "3", Status = "STOPPED" });
        subject.OnNext(new MyInfo() { ID = "4", Status = "PHASE2" });
        subject.OnNext(new MyInfo() { ID = "1", Status = "STARTED" });
        subject.OnNext(new MyInfo() { ID = "2", Status = "STOPPED" });
        subject.OnNext(new MyInfo() { ID = "3", Status = "STOPPED" });
        subject.OnNext(new MyInfo() { ID = "4", Status = "STARTED" });
        subject.OnCompleted();

到目前为止,我的查询和订阅如下所示:(我在示例中使用了秒(

var q8 = from e in subject
                 group e by new { ID = e.ID, Status = e.Status } into g
                 from w in g.Buffer(timeSpan: TimeSpan.FromSeconds(3)
                                   , timeShift: TimeSpan.FromSeconds(1))
                 select new
                 {
                     ID = g.Key.ID,
                     Status = g.Key.Status,
                     count = w.Count
                 };
        var subsc = q8.Subscribe(a => Console.WriteLine("{0} {1} {2}", a.ID, a.Status, a.count));

现在,我可以得到一个输出,告诉我 ID 以及该 ID 在一段时间内看到的状态。

ID 状态计数1 开始 32 阶段1 23 停止 34 开始 24 阶段2 12 停止 1

我接下来要做的是,首先,丢弃那些在间隔内看到超过 1 个状态的状态(因此 ID 2 和 4 将被消除(,在剩下的那些中,丢弃那些状态不是"已启动"的(这将消除 ID 3(。ID 1 是我要找的记录。

这是解决问题的最佳方法吗?我如何实现该查询?

另外,我怎样才能让我的主题以不同的间隔发送消息,以便我可以测试窗口。

谢谢!

使用 RX 查询,如何获取哪些记录在每秒 3 秒的窗口中具有相同的状态

解决方案的实现

我解决这个问题的方法是生成一个扩展方法,该方法接受IObservable<MyInfo>输入流(可以是Subject(和IScheduler,并返回已卡住的项目流。它看起来像这样:

public static class ObservableExtensions
{
    public static IObservable<MyInfo> StuckInfos(this IObservable<MyInfo> source,
        IScheduler scheduler = null)
    {
        scheduler = scheduler ?? Scheduler.Default;
        return source.Publish(pub =>
            pub.Where(x => x.Status == "STARTED")
                .SelectMany(
                    x => Observable.Return(x)
                        .Delay(TimeSpan.FromMinutes(30), scheduler)
                        .TakeUntil(pub.Where(y => y.Id == x.Id
                                                  && y.Status != "STARTED"))));
    }
}

这里有很多Rx!让我们一点一点地来...

一般的想法是,我们希望查找处于"STARTED"状态的MyInfo实例(以下简称"项目"(,这些实例被具有匹配 ID 的非启动项目"未应答"30 分钟。

暂时忽略Publish位,我会回到那个。想象一下,pub变量是source

步骤 1 - 筛选"已启动"项

pub.Where(x => x.Status == "STARTED")

这一点很容易,我们只是过滤了源以获得"已启动"项。

第 2 步 - 将每个项目转换为自己的延迟流

这有点棘手。从一件物品出现的那一刻起,我们就知道 30 分钟后我们想回答这个问题,"另一个物品出现来解开它了吗?为了帮助我们做到这一点,我们将创建一个新的流,该流将在 30 分钟后仅发出信息本身。我们的计划是,如果符合条件的项目出现以解开它,则缩短此流。假设x是信息,那么我们这样做:

Observable.Return(x)
          .Delay(TimeSpan.FromMinutes(30), scheduler)

Observable.Return 将项目转换为可观察的流,该流立即为该项目发出OnNext,然后OnComplete s。尽管看起来有点毫无价值,但它实际上是一个非常有用的构建块(对于一些高级阅读,这有时被称为单元函数,IObservable monad 结构的关键部分(,因为它使我们能够从任何项目创建新的可观察流。一旦我们有了这个流,我们就会Delay它,以便该项目将在 30 分钟后出现。

请注意我们在调用Delay时如何指定调度程序 - 这将有助于使用模拟时间进行测试。

步骤 3 - 如果项目"未卡住",则放弃延迟流

现在,如果一个让我们"解卡"的合格项目在 30 分钟结束之前到达,我们将不再对这个项目感兴趣。

"取消粘附"项目的资格标准是与 ID 匹配且没有"已开始"状态的标准 - 我假设该项目的另一个"已开始"副本不足以解开卡住的项目!(如果这是错误的,那么任何状态都有资格取消粘连(。如果一个取消粘附的项目到达原始的未延迟流 ( pub (,我们使用TakeUntil在延迟的项目有机会出现之前终止延迟流:

.TakeUntil(pub.Where(y => y.Id == x.Id
                          && y.Status != "STARTED"))));

步骤4 - 整理所有这些项目流

现在,我们有点混乱,因为我们将每个项目都投影到它自己的流中 - 我们有一个流流,我们需要以某种方式返回到单个流。为此,我们使用SelectMany(高级阅读:相当于 monad 绑定(。SelectMany在这里为我们做了两项工作 - 它允许我们将项目映射到流中,并将生成的流流一次性平展回单个流。我们将使用的映射函数是我们刚刚构建的函数 - 所以到目前为止,将它们放在一起,我们有:

pub.Where(x => x.Status == "STARTED")
                .SelectMany(
                 x => Observable.Return(x)
                                .Delay(TimeSpan.FromMinutes(30), scheduler)
                                .TakeUntil(pub.Where(y => y.Id == x.Id
                                                    && y.Status != "STARTED"))));

步骤 5 - 通过使用Publish确保我们不会弄乱源流

我们看起来不错,但上面左边有一个微妙的问题需要解决。您会注意到我们订阅了源(pub(流不止一个 - 在初始Where过滤器和TakeUntil中。

这样做的问题是,多次订阅同一流可能会产生意想不到的后果。有些流是"冷"的 - 每个订阅者都开始自己的事件链。在时间是关键因素的查询中,这可能特别棘手。也可能有其他问题 - 但我不想在这里太深入。基本上,我们需要非常小心,我们只订阅一次源流。Publish() 方法可以为我们做到这一点 - 它将订阅一次源,然后将源多播给许多订阅者。

因此,lambda 中出现的pub是我们可以多次安全订阅的源的"安全"副本。

如何解决测试

您需要为此控制时间 - 最好的选择是在 nuget 包rx-testing中使用专门构建的 Rx 测试设施。有了这个,您可以使用TestScheduler来控制时间和安排测试事件。

这是一个简单的测试,可以检测一个简单的卡住的项目。

public class StuckDetectorTests : ReactiveTest
{
    [Test]
    public void FindSingleStuckItem()
    {
        var testScheduler = new TestScheduler();
        var xs = testScheduler.CreateColdObservable(
            OnNext(TimeSpan.FromMinutes(5).Ticks, MyInfo.Started("1")));
        var results = testScheduler.CreateObserver<MyInfo>();
        xs.StuckInfos(testScheduler).Subscribe(results);
        testScheduler.Start();
        results.Messages.AssertEqual(
            OnNext(TimeSpan.FromMinutes(35).Ticks, MyInfo.Started("1")));
    }
}

确保从ReactiveTest派生测试类,以利用OnXXX帮助程序方法。

我还在MyInfo上创建了一些有用的工厂方法,并实现了相等重载以使测试更容易。

完整的代码相当冗长 - 我发布了一个要点,在这里进行了更多测试:https://gist.github.com/james-world/62dca2fe2f91531a0401

这里还有一篇关于测试 Rx 的好博客文章。