如何将虚拟时间用于任务.在使用反应扩展测试Observables时运行
本文关键字:扩展 测试 Observables 运行 虚拟 时间 用于 任务 | 更新日期: 2023-09-27 18:24:45
我有以下功能,我想测试
/// Items are processed asynchronously via fn as they arrive. However
/// if an item arrives before the last asynchronous operation has
/// completed then the cancellation token passed to fn will be
/// triggered enabling the task to be canceled in a best effort
/// way.
public static IObservable<U> SelectWithCancellation<T, U>
( this IObservable<T> This
, Func<CancellationToken, T, Task<U>> fn
)
{
return This
.Select(v=>Observable.FromAsync(token=>fn(token, v)))
.Switch();
}
我想测试一下,看看我能想出的最好的下面是有效的。首先,我创建了一个长期运行的任务可以取消
public Task<string> JobTask
( CancellationToken token
, string input
)
{
return Task.Factory.StartNew(() =>
{
if ( input == "C" || input == "E" )
{
while ( !token.IsCancellationRequested ) ;
}
return input;
}
);
}
然后我测试它是否真的可以工作
public class SelectWithCancelationSpec : ReactiveTest
{
TestScheduler _Scheduler = new TestScheduler();
[Fact]
public void ShouldWork()
{
var o = _Scheduler.CreateHotObservable
( OnNext(100, "A")
, OnNext(200, "B")
, OnNext(300, "C")
, OnNext(400, "D")
, OnNext(500, "E")
, OnNext(500, "F")
);
List<string> actual = new List<string>();
o
.SelectWithCancellation(JobTask)
.Subscribe(v => actual.Add(v));
var delay = 100;
_Scheduler.AdvanceTo(150);
Thread.Sleep(delay);
_Scheduler.AdvanceTo(250);
Thread.Sleep(delay);
_Scheduler.AdvanceTo(350);
Thread.Sleep(delay);
_Scheduler.AdvanceTo(450);
Thread.Sleep(delay);
_Scheduler.AdvanceTo(550);
Thread.Sleep(delay);
_Scheduler.AdvanceTo(650);
var expected = new[] { "A", "B", "D", "F" };
actual
.ShouldBeEquivalentTo(expected);
}
}
问题是我不得不将real time
引入测验这是因为我模拟的JobTask运行在一个真实的线程离开线程池并且不尊重虚拟时间测试调度程序的。如果我不拖延会发生什么在AdvanceTo
呼叫之间,我丢弃的消息比我期望在测试中,因为JobTask处理时间太长。
问题是。我如何创建一个尊重虚拟时间,并允许我测试是否可以成功降落预期消息。
关键是创建一个TestScheduler知道的tick事件流。为此,我创建了一个扩展方法
public static class TestSchedulerExtensions
{
public static IObservable<Unit> CreateTickObserver(this TestScheduler s, int startTick, int endTick, int tickStep)
{
var ticks = Enumerable.Repeat(1, Int32.MaxValue)
.Select(( v, i ) => i * tickStep + startTick)
.TakeWhile(v => v <= endTick)
.Select(tick => ReactiveTest.OnNext(tick, Unit.Default));
return s.CreateColdObservable(ticks.ToArray());
}
}
然后是另一种扩展方法,以帮助在测试条件下创建任务
public static Func<CancellationToken,U,Task<T>>
AsyncSelectorFactory<T, U>
( this TestScheduler s
, int duration
, int interval
, Func<CancellationToken, U, IObservable<Unit>, Task<T>> fn
)
{
var ticker = s.CreateTickObserver(0, duration, interval);
return ( c, u ) =>
{
return fn(c, u, ticker);
};
}
TaskFactory生成的函数可以生成任务,但在测试调度程序的控制下被传递一个ticker。那个ticker可以用来造成延迟或其他事情。
请注意,我们正在等待_Ticker来源的可观测数据来创建延迟在任务中。现在我们的测试用例看起来像
现在的测试只是
public class SelectWithCancelationSpec : ReactiveTest
{
TestScheduler _Scheduler = new TestScheduler();
[Fact]
public void ShouldWork()
{
var o = _Scheduler.CreateColdObservable
( OnNext(100, "A")
, OnNext(200, "B")
, OnNext(300, "C")
, OnNext(400, "D")
, OnNext(500, "E")
, OnNext(600, "F")
);
int cancelCount = 0;
var job = _Scheduler.AsyncSelectorFactory<string,string>
( 1000
, 10
, async ( token, input, ticker ) => {
if ( input == "C" || input == "E" )
{
await ticker.TakeWhile(v => !token.IsCancellationRequested);
cancelCount++;
}
return input;
});
var actual = _Scheduler.Start(() =>
{
return o.SelectWithCancellation(job);
}
, created: 0
, subscribed: 1
, disposed: 1000
);
var expected = new[] { "A", "B", "D", "F" };
cancelCount.Should().Be(2);
actual.Messages.Select(v=>v.Value.Value)
.ShouldBeEquivalentTo(expected);
}
}