制作IObservable它使用async/await以原始顺序返回已完成的任务

本文关键字:原始 await 顺序 返回 任务 已完成 async IObservable 制作 | 更新日期: 2023-09-27 18:06:51

假设你有一个包含100个url的列表,你想下载它们,解析响应并通过IObservable推送结果:

public IObservable<ImageSource> GetImages(IEnumerable<string> urls)
{
    return urls
        .ToObservable()
        .Select(async url =>
        {
            var bytes = await this.DownloadImage(url);
            var image = await this.ParseImage(bytes);
            return image;
        });
}

我有一些问题。

一个是,在同一时间用100个请求冲击服务器是不礼貌的——理想情况下,你可能会在给定时刻限制6个请求。但是,如果我添加Buffer调用,由于Select中的异步lambda,所有内容仍然同时触发。

此外,返回结果的顺序将与url的输入顺序不同,这很糟糕,因为图像是将在UI上显示的动画的一部分。

我已经尝试了各种方法,我有一个有效的解决方案,但感觉很复杂:

public IObservable<ImageSource> GetImages(IEnumerable<string> urls)
{
    var semaphore = new SemaphoreSlim(6);
    return Observable.Create<ImageSource>(async observable =>
    {
        var tasks = urls
            .Select(async url =>
            {
                await semaphore.WaitAsync();
                var bytes = await this.DownloadImage(url);
                var image = await this.ParseImage(url);
            })
            .ToList();
        foreach (var task in tasks)
        {
            observable.OnNext(await task);
        }
        observable.OnCompleted();
    });
}

它工作,但现在我做的是Observable.Create而不是IObservable.Select,我必须弄乱信号量。此外,在UI上运行的其他动画停止时,这是运行(他们基本上只是DispatcherTimer实例),所以我想我一定是做错了什么。

制作IObservable<T>它使用async/await以原始顺序返回已完成的任务

试一试:

urls.ToObservable()
    .Select(url => Observable.FromAsync(async () => {
        var bytes = await this.DownloadImage(url);
        var image = await this.ParseImage(bytes);
        return image;        
    }))
    .Merge(6 /*at a time*/);
我们在这里做什么?

对于每个URL,我们都创建了一个冷观察对象(也就是说,它不会做任何事情,直到有人调用Subscribe)。FromAsync返回一个可观察对象,当你订阅它时,运行你给它的异步块。因此,我们选择URL到一个对象,它会为我们做的工作,但只有当我们问它。

那么,我们的结果是一个IObservable<IObservable<Image>>——一个未来结果流。我们想把这个流平铺成一个结果流,所以我们使用Merge(int)。合并操作符将一次订阅n项,当它们返回时,我们将订阅更多项。即使url列表非常大,Merge所缓冲的项目也只是一个url和一个Func对象(即要做什么的描述),所以相对较小。

实现这一目标的一种方法是使用Observable.StartAsync方法、SemaphoreSlimConcat运算符。Observable.StartAsync将创建热可观察对象(立即启动),SemaphoreSlim将限制图像的下载/解析,Concat将按原始顺序收集图像。

public IObservable<ImageSource> GetImages(IEnumerable<string> urls, int maxConcurrency)
{
    return Observable.Using(() => new SemaphoreSlim(maxConcurrency), semaphore =>
        urls
            .ToObservable()
            .Select(url => Observable.StartAsync(async cancellationToken =>
            {
                await semaphore.WaitAsync(cancellationToken);
                try
                {
                    var bytes = await this.DownloadImage(url);
                    var image = await this.ParseImage(bytes);
                    return image;
                }
                finally { semaphore.Release(); }
            }))
            .Concat());
}

您可以考虑将cancellationToken参数传递给DownloadImageParseImage方法,以避免在后台运行"即发即弃"操作,以防产生的IObservable<ImageSource>由于任何原因(错误或取消订阅)而过早终止。