与固定数量的任务/线程并行使用 IEnumerable

本文关键字:并行 线程 IEnumerable 任务 | 更新日期: 2023-09-27 17:56:29

我有一个源IEnumerable<T>,我想以并行方式处理它,具有固定数量的任务/线程(接近处理器的数量),每个任务/线程都从源中抓取下一项并对其进行处理,直到所有元素都已迭代。

  • Parallel.For不是候选者,因为元素的数量未知。
  • Parallel.ForEach 不是候选者,因为即使指定MaxDegreeOfParallelism也会创建许多任务,因为此参数只能确保并发运行的最大任务数,而不是创建的任务数。
  • 必须通知每个任务,源已遍历直到其结束,以便它可以运行一些总结逻辑。
  • 源列表的元素不能保存在内存中,但必须连续处理和丢弃。

听起来像是生产者/消费者的问题,简化了生产者可以是单线程的,一旦 IEnumerable 完成,就不会再添加任何元素。

使用TPL解决此问题的方法如何?我是否必须实现自己的可共享线程安全IEnumerable或者框架是否提供了一些东西?

编辑:这是我对Parallel.ForEach的尝试,并指定MaxDegreeOfParallelism,这不会阻止TPL创建许多任务。

int nbTasks = 0;
Parallel.ForEach(positions, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount },
    () => { return new List<IPositionData>(); },
    (position, loop, list) =>
    {
        Thread.Sleep(1);
        list.Add(position);
        return list;
     },
     list => Interlocked.Add(ref nbTasks, 1));
Trace.WriteLine(string.Format("Tasks: {0}", nbTasks));

评论:positions是我的来源IEnumerable<IPositionData>。我刚刚运行了这个,例如,nbTasks 是 64(而不是我的 4 个内核上的预期 4

与固定数量的任务/线程并行使用 IEnumerable

)。

可以通过使用需要ParallelOptions对象的重载并设置 MaxDegreeOfParallelism 属性来限制Parallel.ForEach中的任务数。

您可以在Parallel.ForEach中限制任务数量:

   in maxNumberOfTasks = 4;
   Parallel.ForEach(collection, new ParallelOptions { MaxDegreeOfParallelism = maxNumberOfTasks}, 
                 i => {
                        //Your action here
                  });