我如何使用TestScheduler与ReplaySubject时间窗口

本文关键字:ReplaySubject 时间 窗口 TestScheduler 何使用 | 更新日期: 2023-09-27 18:16:04

我有一个测试代码示例:

    [Fact]
    public void Should_only_contain_most_recent() {
        var window = TimeSpan.FromMilliseconds(200);
        var results = new ReplaySubject<long>(window);
        results.OnNext(1);
        results.OnNext(2);
        System.Threading.Thread.Sleep(50);
        results.OnNext(3);
        System.Threading.Thread.Sleep(50);
        results.OnNext(4);
        System.Threading.Thread.Sleep(200);
        results.OnNext(5);
        results.OnCompleted();
        var items = results.ToEnumerable();
        Assert.True(items.SequenceEqual(new long [] { 5 }));
    }

,我想把它变成使用TestScheduler从Microsoft.Reactive.Testing包,因为我想摆脱这里的睡眠。

我已经试过那样做了。

    [Fact]
    public void Should_only_contain_most_recent() {
        var scheduler = new TestScheduler();
        var window = TimeSpan.FromMilliseconds(200);
        var results = new ReplaySubject<long>(window /* Never finishes when add the scheduler here.*/);
        results.OnNext(1);
        results.OnNext(2);
        scheduler.Schedule(TimeSpan.FromMilliseconds(50), () => results.OnNext(3));
        scheduler.Schedule(TimeSpan.FromMilliseconds(50), () => results.OnNext(4));
        scheduler.Schedule(TimeSpan.FromMilliseconds(200), () => results.OnNext(5));
        scheduler.Schedule(TimeSpan.FromMilliseconds(210), results.OnCompleted);
        scheduler.Start();
        var items = results.ToEnumerable();
        Assert.True(items.SequenceEqual(new long[] { 5 }));
    }

我如何使用TestScheduler与ReplaySubject时间窗口

假设你正在尝试测试ReplaySubject的重放行为。问题是您需要启动TestScheduler两次。它只运行到所有计划的事件都被执行为止——但是我们需要在没有任何订阅者的情况下加载ReplaySubject,以便它拥有它将重播的事件——然后在附加了订阅者后再次运行调度程序。

下面是编写此测试的惯用方法:

首先,我用我们需要的事件设置source。通过从ReactiveTest派生您的测试类,您可以利用OnXXX帮助器方法更容易地创建源事件。

一旦我们准备好了源流,我们可以订阅ReplaySubject,并在TestScheduler上调用Start。这将运行虚拟时间,直到最后一个计划的事件(T=300ms)—我通过转储超时来演示。

现在我们使用TestSchedulerCreateObserver方法为结果创建一个记录器,并将其订阅到ReplaySubject

现在我们再次启动TestScheduler—这将从T=300ms开始运行时钟,直到刷新新生成的计划事件。这些是ReplaySubject需要重放的事件——那些在200ms窗口内的事件。请注意,这些将在T=300ms + 1 ticks, + 2 ticks和+ 3 ticks发出-这是因为TestScheduler具有1 tick分辨率。

代码如下:

public class MyTests : ReactiveTest
{
    [Fact]
    public void Should_only_contain_most_recent()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateHotObservable(
            OnNext(0, 1),
            OnNext(0, 2),
            OnNext(TimeSpan.FromMilliseconds(50).Ticks, 3),
            OnNext(TimeSpan.FromMilliseconds(100).Ticks, 4),
            OnNext(TimeSpan.FromMilliseconds(300).Ticks, 5),
            OnCompleted(TimeSpan.FromMilliseconds(300).Ticks, witness: 0));
        var replaySubject = new ReplaySubject<int>(
            TimeSpan.FromMilliseconds(200), scheduler);
        source.Subscribe(replaySubject);
        scheduler.Start();
        /* the test scheduler is now at 300 milliseconds
            * and the ReplaySubject is loaded */
        Console.WriteLine(scheduler.Now.Ticks);
        var results = scheduler.CreateObserver<int>();
        replaySubject.Subscribe(results);
        /* run the scheduler on to flush the events from the ReplaySubject */
        scheduler.Start();
        results.Messages.AssertEqual(
            OnNext(TimeSpan.FromMilliseconds(300).Ticks + 1, 4),
            OnNext(TimeSpan.FromMilliseconds(300).Ticks + 2, 5),
            OnCompleted(TimeSpan.FromMilliseconds(300).Ticks + 3, witness: 0));
    }
}