ObserveOn用于Observables的必要性

本文关键字:必要性 Observables 用于 ObserveOn | 更新日期: 2023-09-27 18:22:00

请参阅以下代码:

 var obs = Observable.Start(() => LongRunningMethodToRetrieveData());
 obs.Subscribe(x => MethodThatMustBeOnUIThread(x));

如果我确信这两行代码是在UI线程上执行的,是否需要首先调用ObserveOn(SynchronizationContext.Current)?或者类似地,我需要检查MethodThatMustBeOnUIThread内部的InvokeRequired吗?

基本上,我能用这两行代码保证在创建订阅的线程上调用"OnNext"吗?

谢谢。

编辑:嗯,我在调试器中尝试过,"MethodThatMustBeOnUIThread"确实是从后台线程调用的。为什么会这样?我现在的假设是,默认情况下,观察发生在异步方法运行的线程上。

ObserveOn用于Observables的必要性

您需要熟悉各种Rx方法使用的默认调度程序。

Observable.Generate这样的方法确实在订阅observable的线程上运行。

另一方面,Observable.Start方法的目的是在订阅observable时异步调用lamdba操作。如果它发生在UI线程上,就不会是异步的。因此,在这种情况下,它使用ThreadPool调度器。

这可以通过使用Reflector.NET:看到

public static IObservable<TSource> Start<TSource>(Func<TSource> function)
{
    if (function == null)
    {
        throw new ArgumentNullException("function");
    }
    return function.ToAsync<TSource>()();
}
public static Func<IObservable<TResult>> ToAsync<TResult>(
    this Func<TResult> function)
{
    if (function == null)
    {
        throw new ArgumentNullException("function");
    }
    return function.ToAsync<TResult>(Scheduler.ThreadPool);
}

因此,如果您希望订阅在UI线程上运行,那么在调用Subscribe之前,知道使用的调度程序必须使用ObserveOn的形式。

此外,由于您使用的是Rx,所以我不会使用InvokeRequired——这只是混合异步编码模型。Rx拥有一切你需要玩线程很好。