等待IObservable获取所有元素出错
本文关键字:元素 出错 IObservable 获取 等待 | 更新日期: 2023-09-27 18:10:26
我有这样一个类:
public class TestService
{
public IObservable<int> GetObservable(int max)
{
var subject = new Subject<int>();
Task.Factory.StartNew(() =>
{
for (int i = 0; i < max; i++)
{
subject.OnNext(i);
}
subject.OnCompleted();
});
return subject;
}
}
我还为此编写了一个测试方法:
[TestMethod]
public void TestServiceTest1()
{
var testService = new TestService();
var i = 0;
var observable = testService.GetObservable(3);
observable.Subscribe(_ =>
{
i++;
});
observable.Wait();
Assert.AreEqual(i, 3);
}
但有时我得到错误:序列不包含元素方法Wait().
我建议我的IObservable在test到达observable.Wait()行之前完成。我怎样才能避免这个错误?在我看来,这段代码的基本问题是IObservable
代表了如何观察某物的合同。
在这种情况下,GetObservable
方法不只是返回合约,它正在立即执行工作(使用TPL)。如果您认为可以多次订阅同一个IObservable
实例(这实际上发生在示例代码中,因为您第一次订阅Subscribe
,第一次订阅Wait
),那么这就没有意义了。这个单一的区别是我学习Rx的最大障碍。
我会这样实现这个方法(并避免使用Subject<>
,因为它不是创建Observable的首选方式):
public class TestService
{
public IObservable<int> GetObservable(int max)
{
return Observable.Create<int>((IObserver<int> observer) =>
{
for (int i = 0; i < max; i++)
{
observer.OnNext(i);
}
observer.OnCompleted();
});
}
}
Observable.Create
也有一些有趣的覆盖,提供了更多的灵活性。