如何通过在原始序列上运行任务来创建Rx序列';s值

本文关键字:序列 Rx 创建 任务 原始 何通过 运行 | 更新日期: 2023-09-27 18:22:03

我有一个类型为IObservable<T>的序列和一个将T, CancellationToken映射到Task<U>的函数。从他们身上得到IObservable<U>最干净的方法是什么?

我需要以下语义:

  • 每个任务在上一个项目的任务完成后开始
  • 如果任务已取消或出现故障,则跳过该任务
  • 原始序列的顺序被严格保留

这是我看到的签名:

public static IObservable<U> Select<T, U> (
    this IObservable<T> source,
    Func<T, CancellationToken, Task<U>> selector
);

我还没有写任何代码,但除非有人打败我,否则我会写的。
无论如何,我不熟悉像Window这样的运算符,所以我的解决方案可能不那么优雅。

我需要C#4中的解决方案,但为了进行比较,C#5的答案也很受欢迎。


如果你好奇的话,下面是我的真实世界场景,或多或少:

Dropbox.GetImagesRecursively ()
    .ObserveOn (SynchronizationContext.Current)
    .Select (DownloadImage)
    .Subscribe (AddImageToFilePicker);

如何通过在原始序列上运行任务来创建Rx序列';s值

到目前为止,这似乎对我有效:

public static IObservable<U> Select<T, U> (
    this IObservable<T> source,
    Func<T, CancellationToken, Task<U>> selector)
{
    return source
        .Select (item => 
            Observable.Defer (() => 
                Observable.StartAsync (ct => selector (item, ct))
                    .Catch (Observable.Empty<U> ())
            ))
        .Concat ();
}

我们将一个延迟的基于任务的异常吞咽可观察到的内容映射到每个项目,然后将它们连接起来。


我的思考过程是这样的。

我注意到其中一个SelectMany重载几乎完全符合我的要求,甚至具有完全相同的签名。但它并不能满足我的需求:

  • 当原始项目出现时,它会创建任务,而我需要等待每个任务完成
  • 它不提供跳过已取消和出现故障的任务的选项

我查看了这个重载的实现,注意到它使用FromAsync来处理任务创建和取消:

public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult> (IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
{
    return SelectMany_<TSource, TTaskResult, TResult> (
        source,
        x => FromAsync (ct => taskSelector (x, ct)),
        resultSelector
    );
}

我把目光转向FromAsync,看看它是如何实现的,并惊喜地发现它也是可组合的:

public virtual IObservable<TResult> FromAsync<TResult> (Func<CancellationToken, Task<TResult>> functionAsync)
{
    return Defer (() => StartAsync (functionAsync));
}

我重新使用了DeferStartAsync,同时还添加了Catch来接收错误。DeferConcat的组合确保任务相互等待并按原始顺序开始。