如何确保在反应式扩展中同步调用选择和订阅方法

本文关键字:选择 调用 同步 方法 扩展 何确保 确保 反应式 | 更新日期: 2023-09-27 18:34:00

我正在测试尝试测试 RX 并创建 Stream(),它提供两个相隔 1 秒的事件。

private IObservable<string> Stream()
{
    return Observable.Create<string>
    (
        (IObserver<string> observer) =>
        {
            observer.OnNext("a");
            observer.OnNext("b");
            observer.OnCompleted();         
            return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
        }
    );
}
  _refreshFiberStream =
    Stream()
    .SubscribeOn(schedulerProvider.EventLoop) 
    .Select(DoCalc)
    .ObserveOn(schedulerProvider.Dispatcher)
    .Subscribe(Update);

和调度提供程序

 public sealed class SchedulerProvider : ISchedulerProvider
  {
    public IScheduler Dispatcher
    {
      get { return DispatcherScheduler.Current; }
    }
    public IScheduler EventLoop
    {
      get { return new EventLoopScheduler(); }
    }
    // ...
  }

我看到 DoCalc 方法为每个输入调用两次,然后是调用两次的更新方法,DoCalc,DoCalc,更新,更新。相反,我试图先对DoCalc方法进行排序,然后是更新方法。重复第二个输入的序列,因此第二个输入可以建立在第一个输入的结果之上

,DoCalc,更新,DoCalc,更新

任何想法

如何确保在反应式扩展中同步调用选择和订阅方法

首先,我会遵循将 SubscribeOn/ObserveOn 对放在最终订阅方法之前的模式。这只会为您节省很多头发。

_refreshFiberStream = Stream()
    .Select(DoCalc)
    //Always in this pattern
    .SubscribeOn(schedulerProvider.EventLoop) //SubscribeOn right before ObserveOn (or before Subscribe if no ObserveOn is used)
    .ObserveOn(schedulerProvider.Dispatcher)  //ObserveOn right before Subscribe
    .Subscribe(Update);                       //Subscribe Last

接下来,如果我重新创建您的示例并添加缺少的代码,我会得到正确/预期的输出。

void Main()
{
    var els = new EventLoopScheduler();
    var dispatcher = new EventLoopScheduler();
    Stream()
        .SubscribeOn(els) //TODO: Move to line just before ObserveOn
        .Select(DoCalc)
        .ObserveOn(dispatcher)
        .Subscribe(Update); 
}
// Define other methods and classes here
private IObservable<string> Stream()
{
    return Observable.Create<string>
    (
        (IObserver<string> observer) =>
        {
            observer.OnNext("a");
            Thread.Sleep(1000);
            observer.OnNext("b");
            observer.OnCompleted();         
            return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
        }
    );
}
private string DoCalc(string val)
{
    val.Dump("DoCalc()");
    Thread.Sleep(1000);
    return val;
}
private void Update(string val)
{
    val.Dump("Update()");
}

因此,您可以重新发布带有输出的完整示例代码。将此作为单元测试、控制台应用或 LinqPad 示例执行。

猜测您的下一个问题,可能是:如何使用来自前一个值的投影(Select)的值来帮助计算下一个值?如果需要一组正在运行的计算,请使用 Scan 方法,或者,如果只需要最终结果而不是计算的每个值,请使用 Aggregate 方法。