Rx扩展:Parallel.ForEach在哪里?

本文关键字:在哪里 ForEach Parallel 扩展 Rx | 更新日期: 2023-09-27 18:18:28

我有一段使用Parallel.ForEach的代码,可能基于旧版本的Rx扩展或任务并行库。我安装了当前版本的Rx扩展,但找不到Parallel.ForEach。我没有使用库的任何其他花哨的东西,只是想并行处理一些数据,像这样:

Parallel.ForEach(records, ProcessRecord);

我发现了这个问题,但是我不想依赖于旧版本的Rx。但是我没有找到类似的Rx,那么使用当前的Rx版本,当前最直接的方法是什么?该项目使用。net 3.5。

Rx扩展:Parallel.ForEach在哪里?

如果你有Rx,就不需要做这些愚蠢的事情了:

records.ToObservable()
    .SelectMany(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))
    .ToList()
    .First();

(或者,如果您希望以效率为代价保持项的顺序):

records.ToObservable()
    .Select(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))
    .Concat()
    .ToList()
    .First();

或者,如果您想限制同一时间的条目数量:

records.ToObservable()
    .Select(x => Observable.Defer(() => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler)))
    .Merge(5 /* at a time */)
    .ToList()
    .First();

下面是一个简单的替换:

class Parallel
{
    public static void ForEach<T>(IEnumerable<T> source, Action<T> body)
    {
        if (source == null)
        {
            throw new ArgumentNullException("source");
        }
        if (body == null)
        {
            throw new ArgumentNullException("body");
        }
        var items = new List<T>(source);
        var countdown = new CountdownEvent(items.Count);
        WaitCallback callback = state =>
        {
            try
            {
                body((T)state);
            }
            finally
            {
                countdown.Signal();
            }
        };
        foreach (var item in items)
        {
            ThreadPool.QueueUserWorkItem(callback, item);
        }
        countdown.Wait();
    }
}

如果有人发现这个线程现在,更新版本的答案将是:

records.AsParallel().WithDegreeOfParallelism(5).ForAll(x => ProcessRecord(x));