等待IObservable获取所有元素出错

本文关键字:元素 出错 IObservable 获取 等待 | 更新日期: 2023-09-27 18:10:26

我有这样一个类:

public class TestService
{
     public IObservable<int> GetObservable(int max)
     {
         var subject = new Subject<int>();
         Task.Factory.StartNew(() =>
                               {
                                   for (int i = 0; i < max; i++)
                                   {
                                       subject.OnNext(i);
                                   }
                                   subject.OnCompleted();
                               });
         return subject;
     }
}

我还为此编写了一个测试方法:

[TestMethod]
public void TestServiceTest1()
{
   var testService = new TestService();
   var i = 0;
   var observable = testService.GetObservable(3);
   observable.Subscribe(_ =>
   {
      i++;
   });          
   observable.Wait();
   Assert.AreEqual(i, 3);
}

但有时我得到错误:序列不包含元素方法Wait().

我建议我的IObservable在test到达observable.Wait()行之前完成。我怎样才能避免这个错误?

等待IObservable获取所有元素出错

在我看来,这段代码的基本问题是IObservable 代表了如何观察某物的合同。

在这种情况下,GetObservable方法不只是返回合约,它正在立即执行工作(使用TPL)。如果您认为可以多次订阅同一个IObservable实例(这实际上发生在示例代码中,因为您第一次订阅Subscribe,第一次订阅Wait),那么这就没有意义了。这个单一的区别是我学习Rx的最大障碍。

我会这样实现这个方法(并避免使用Subject<>,因为它不是创建Observable的首选方式):

public class TestService
{
     public IObservable<int> GetObservable(int max)
     {
         return Observable.Create<int>((IObserver<int> observer) =>
                               {
                                   for (int i = 0; i < max; i++)
                                   {
                                       observer.OnNext(i);
                                   }
                                   observer.OnCompleted();
                               });
     }
}
Observable.Create也有一些有趣的覆盖,提供了更多的灵活性。