Rx扩展并行.ForEach节流

本文关键字:节流 ForEach 并行 扩展 Rx | 更新日期: 2023-09-27 17:51:20

我遵循这个问题的答案:Rx扩展:Parallel.ForEach在哪里?

我遇到的问题是,它似乎是分配一个新的线程每个请求,而使用Parallel.ForEach做了相当少。

并行运行的进程是相当内存密集型的,所以如果我试图一次处理数百个项目,那么提供给链接问题的答案很快就会看到我的内存耗尽。

是否有一种方法可以修改这个答案来限制在任何给定时间内正在完成的项目的数量?

我已经看了WindowBuffer操作,我的代码看起来像这样:

return inputs.Select(i => new AccountViewModel(i))
    .ToObservable()
    .ObserveOn(RxApp.MainThreadScheduler)
    .ToList()
    .Do(l =>
    {
        using (Accounts.SuppressChangeNotifications())
        {
            Accounts.AddRange(l);
        }
    })
    .SelectMany(x => x)
    .SelectMany(acc => Observable.StartAsync(async () =>
    {
        var res = await acc.ProcessAsync(config, m, outputPath);
        processed++;
        var prog = ((double) processed/inputs.Count())*100.0;
        OverallProgress.Message.OnNext(string.Format("Processing Accounts ({0:000}%)", prog));
        OverallProgress.Progress.OnNext(prog);
        return res;
    }))
    .All(x => x);

理想情况下,我希望能够将其批处理成帐户视图模型的块,然后我调用ProcessAsync方法,并且只有在所有批处理完成后才能继续前进。

理想情况下,我希望它这样,即使只有一个批处理完成,它继续前进,但只保持相同的批大小。

如果我有一批5和1完成,我希望另一个开始,但只有一个,直到更多的空间可用。

Rx扩展并行.ForEach节流

像往常一样,Paul Betts回答了一个类似的问题,解决了我的问题:

问题:基于特定数字的响应式扩展并行处理

有一些关于使用Observable.Defer的信息,然后合并成批,使用我已经修改了我以前的代码,如下所示:

return inputs.Select(i => new AccountViewModel(i))
    .ToObservable()
    .ObserveOn(RxApp.MainThreadScheduler)
    .ToList()
    .Do(l =>
    {
        using (Accounts.SuppressChangeNotifications())
        {
            Accounts.AddRange(l);
        }
    })
    .SelectMany(x => x)
    .Select(x => Observable.DeferAsync(async _ =>
    {
        var res = await x.ProcessAsync(config, m, outputPath);
        processed++;
        var prog = ((double) processed/inputs.Count())*100.0;
        OverallProgress.Message.OnNext(string.Format("Processing Accounts ({0:000}%)", prog));
        OverallProgress.Progress.OnNext(prog);
        return Observable.Return(res);
    }))
    .Merge(5)
    .All(x => x);

果然,我得到了滚动完成行为(例如,如果1/5完成,那么只有一个开始)。

显然我还需要掌握更多的基础知识,但这太棒了!