Observable.FromAsync vs Task.ToObservable

本文关键字:ToObservable Task vs FromAsync Observable | 更新日期: 2023-09-27 18:31:24

有没有人知道何时使用这些方法之一而不是另一种方法。他们似乎在做同样的事情,因为他们从TPL Task转换为Observable

Observable.FromAsync似乎支持取消令牌,这可能是一种细微的差异,如果可观察量被释放,则允许生成任务的方法参与合作取消。

只是想知道我是否错过了一些明显的东西,为什么你会使用一个而不是另一个。

谢谢

Observable.FromAsync vs Task.ToObservable

Observable.FromAsync接受Func<Task>Func<Task<TResult>>形式的任务工厂,在这种情况下,仅当订阅可观察量时,才会创建和执行任务。

其中.ToObservable()需要一个已经创建(因此启动)的任务。

@Sickboy答案是正确的。

  • Observable.FromAsync()将在订阅时启动任务。
  • Task.ToObservable()需要一个已经在运行的任务。

Observable.FromAsync的一个用途是控制对异步方法的多次调用的重入。

这是这两种方法不等效的示例:

//ob is some IObservable<T>
//ExecuteQueryAsync is some async method
//Here, ExecuteQueryAsync will run **serially**, the second call will start
//only when the first one is already finished. This is an important property
//if ExecuteQueryAsync doesn't support reentrancy
ob
.Select(x => Observable.FromAsync(() => ExecuteQueryAsync(x))
.Concat()
.ObserveOnDispatcher()
.Subscribe(action)

//ob is some IObservable<T>
//ExecuteQueryAsync is some async method
//Even when the `Subscribe` action order will be the same as the first 
//example because of the `Concat`, ExecuteQueryAsync calls could be     
//parallel, the second call to the method could start before the end of the 
//first call. 
.Select(x => ExecuteQueryAsync(x).ToObservable())
.Concat()
.Subscribe(action)

请注意,在第一个示例中,可能需要 ObserveOn()ObserveOnDispatcher() 方法来确保在原始调度程序上执行action,因为Observable.FromAsync不等待任务,因此在任何可用的调度程序上执行延续

查看代码,似乎(至少在某些流中)Observable.FromAsync调用.ToObservable() *。我确信它们的意图应该是语义等效的(假设您传递相同的参数,例如调度程序、取消令牌等)。

一个更适合链接/流畅的语法,一个可能单独阅读更好。无论您喜欢哪种编码风格。

*https://github.com/Reactive-Extensions/Rx.NET/blob/859e6159cb07be67fd36b18c2ae2b9a62979cb6d/Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Async.cs#L727

除了能够使用 CancelToken 之外,FromAsync 还包装在 defer 中,因此这允许根据订阅时的条件更改任务逻辑。 请注意,任务不会启动,内部任务。将调用 ToObservable。 Func 确实允许您在创建任务时启动任务。