我如何使用TestScheduler与ReplaySubject时间窗口
本文关键字:ReplaySubject 时间 窗口 TestScheduler 何使用 | 更新日期: 2023-09-27 18:16:04
我有一个测试代码示例:
[Fact]
public void Should_only_contain_most_recent() {
var window = TimeSpan.FromMilliseconds(200);
var results = new ReplaySubject<long>(window);
results.OnNext(1);
results.OnNext(2);
System.Threading.Thread.Sleep(50);
results.OnNext(3);
System.Threading.Thread.Sleep(50);
results.OnNext(4);
System.Threading.Thread.Sleep(200);
results.OnNext(5);
results.OnCompleted();
var items = results.ToEnumerable();
Assert.True(items.SequenceEqual(new long [] { 5 }));
}
,我想把它变成使用TestScheduler从Microsoft.Reactive.Testing包,因为我想摆脱这里的睡眠。
我已经试过那样做了。
[Fact]
public void Should_only_contain_most_recent() {
var scheduler = new TestScheduler();
var window = TimeSpan.FromMilliseconds(200);
var results = new ReplaySubject<long>(window /* Never finishes when add the scheduler here.*/);
results.OnNext(1);
results.OnNext(2);
scheduler.Schedule(TimeSpan.FromMilliseconds(50), () => results.OnNext(3));
scheduler.Schedule(TimeSpan.FromMilliseconds(50), () => results.OnNext(4));
scheduler.Schedule(TimeSpan.FromMilliseconds(200), () => results.OnNext(5));
scheduler.Schedule(TimeSpan.FromMilliseconds(210), results.OnCompleted);
scheduler.Start();
var items = results.ToEnumerable();
Assert.True(items.SequenceEqual(new long[] { 5 }));
}
假设你正在尝试测试ReplaySubject的重放行为。问题是您需要启动TestScheduler两次。它只运行到所有计划的事件都被执行为止——但是我们需要在没有任何订阅者的情况下加载ReplaySubject
,以便它拥有它将重播的事件——然后在附加了订阅者后再次运行调度程序。
下面是编写此测试的惯用方法:
首先,我用我们需要的事件设置source
。通过从ReactiveTest
派生您的测试类,您可以利用OnXXX帮助器方法更容易地创建源事件。
一旦我们准备好了源流,我们可以订阅ReplaySubject
,并在TestScheduler
上调用Start
。这将运行虚拟时间,直到最后一个计划的事件(T=300ms)—我通过转储超时来演示。
现在我们使用TestScheduler
的CreateObserver
方法为结果创建一个记录器,并将其订阅到ReplaySubject
。
现在我们再次启动TestScheduler
—这将从T=300ms开始运行时钟,直到刷新新生成的计划事件。这些是ReplaySubject
需要重放的事件——那些在200ms窗口内的事件。请注意,这些将在T=300ms + 1 ticks, + 2 ticks和+ 3 ticks发出-这是因为TestScheduler
具有1 tick分辨率。
代码如下:
public class MyTests : ReactiveTest
{
[Fact]
public void Should_only_contain_most_recent()
{
var scheduler = new TestScheduler();
var source = scheduler.CreateHotObservable(
OnNext(0, 1),
OnNext(0, 2),
OnNext(TimeSpan.FromMilliseconds(50).Ticks, 3),
OnNext(TimeSpan.FromMilliseconds(100).Ticks, 4),
OnNext(TimeSpan.FromMilliseconds(300).Ticks, 5),
OnCompleted(TimeSpan.FromMilliseconds(300).Ticks, witness: 0));
var replaySubject = new ReplaySubject<int>(
TimeSpan.FromMilliseconds(200), scheduler);
source.Subscribe(replaySubject);
scheduler.Start();
/* the test scheduler is now at 300 milliseconds
* and the ReplaySubject is loaded */
Console.WriteLine(scheduler.Now.Ticks);
var results = scheduler.CreateObserver<int>();
replaySubject.Subscribe(results);
/* run the scheduler on to flush the events from the ReplaySubject */
scheduler.Start();
results.Messages.AssertEqual(
OnNext(TimeSpan.FromMilliseconds(300).Ticks + 1, 4),
OnNext(TimeSpan.FromMilliseconds(300).Ticks + 2, 5),
OnCompleted(TimeSpan.FromMilliseconds(300).Ticks + 3, witness: 0));
}
}