转换IEnumerable到IObservable,具有最大的并行性

本文关键字:并行性 IObservable IEnumerable 转换 | 更新日期: 2023-09-27 18:19:05

我有一系列异步任务要做(比如,获取N个网页)。现在我想把它们都暴露为IObservable<T>。我当前的解决方案使用了这个问题的答案:

async Task<ResultObj> GetPage(string page) {
    Console.WriteLine("Before");
    var result = await FetchFromInternet(page);
    Console.WriteLine("After");
    return result;
}
// pages is an IEnumerable<string>
IObservable<ResultObj> resultObservable =pages.Select(GetPage).
                 Select(t => Observable.FromAsync(() => t)).Merge();
// Now consume the list
foreach(ResultObj obj in resultObservable.ToEnumerable()) {
    Console.WriteLine(obj.ToString());
}

问题是我不知道要提取的页数,而且它可能很大。我不想同时提出数百个请求。所以我想要一种方法来限制并行执行的任务的最大数量。是否有一种方法可以限制GetPage的并发调用数量?

有一个Merge重载,它接受一个maxConcurrent参数,但它似乎实际上并没有限制函数调用的并发性。控制台在After消息之前打印所有Before消息。

注意:我需要转换回IEnumerable<T>。我正在为一个系统编写数据源,该系统为我提供了要获取的数据描述符,并且我需要给它返回下载数据的列表。

转换IEnumerable<T>到IObservable<T>,具有最大的并行性

EDIT

下面应该可以工作。这个重载限制了并发订阅的数量。

var resultObservable = pages
  .Select(p => Observable.FromAsync(() => GetPage(p)))
  .Merge(maxConcurrent);

为了理解为什么需要这个改变,我们需要一些背景知识

  1. FromAsync返回一个可观察对象,每次它被订阅时都会调用传递的Func。这意味着,如果这个可观察对象从来没有被订阅过,它就永远不会被调用。

  2. Merge主动读取源序列,只有同时订阅 n的观测值。

有了这两部分,我们可以知道为什么原始版本会并行执行所有内容:因为(2),当Merge决定需要订阅多少可观察对象时,GetPage将已经为所有源字符串调用。

我们也可以看到为什么第二个版本工作:即使序列已经被完全迭代,(1)意味着GetPage不会被调用,直到Merge决定它需要订阅n可观察对象。这将导致只同时执行n任务的预期结果。