与固定数量的任务/线程并行使用 IEnumerable
本文关键字:并行 线程 IEnumerable 任务 | 更新日期: 2023-09-27 17:56:29
我有一个源IEnumerable<T>
,我想以并行方式处理它,具有固定数量的任务/线程(接近处理器的数量),每个任务/线程都从源中抓取下一项并对其进行处理,直到所有元素都已迭代。
-
Parallel.For
不是候选者,因为元素的数量未知。 -
Parallel.ForEach
不是候选者,因为即使指定MaxDegreeOfParallelism
也会创建许多任务,因为此参数只能确保并发运行的最大任务数,而不是创建的任务数。 - 必须通知每个任务,源已遍历直到其结束,以便它可以运行一些总结逻辑。
- 源列表的元素不能保存在内存中,但必须连续处理和丢弃。
听起来像是生产者/消费者的问题,简化了生产者可以是单线程的,一旦 IEnumerable 完成,就不会再添加任何元素。
使用TPL解决此问题的方法如何?我是否必须实现自己的可共享线程安全IEnumerable
或者框架是否提供了一些东西?
编辑:这是我对Parallel.ForEach
的尝试,并指定MaxDegreeOfParallelism
,这不会阻止TPL创建许多任务。
int nbTasks = 0;
Parallel.ForEach(positions, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount },
() => { return new List<IPositionData>(); },
(position, loop, list) =>
{
Thread.Sleep(1);
list.Add(position);
return list;
},
list => Interlocked.Add(ref nbTasks, 1));
Trace.WriteLine(string.Format("Tasks: {0}", nbTasks));
评论:positions
是我的来源IEnumerable<IPositionData>
。我刚刚运行了这个,例如,nbTasks 是 64(而不是我的 4 个内核上的预期 4
可以通过使用需要ParallelOptions
对象的重载并设置 MaxDegreeOfParallelism
属性来限制Parallel.ForEach
中的任务数。
您可以在Parallel.ForEach
中限制任务数量:
in maxNumberOfTasks = 4;
Parallel.ForEach(collection, new ParallelOptions { MaxDegreeOfParallelism = maxNumberOfTasks},
i => {
//Your action here
});