并行ForEach的本地初始化是如何工作的
本文关键字:何工作 工作 ForEach 初始化 并行 | 更新日期: 2023-09-27 17:58:52
我不确定在Parallel.ForEach中使用本地init函数,如msdn文章中所述:http://msdn.microsoft.com/en-us/library/dd997393.aspx
Parallel.ForEach<int, long>(nums, // source collection
() => 0, // method to initialize the local variable
(j, loop, subtotal) => // method invoked by the loop on each iteration
{
subtotal += nums[j]; //modify local variable
return subtotal; // value to be passed to next iteration
},...
()=>0如何初始化任何内容?变量的名称是什么?如何在循环逻辑中使用它?
参考Parallel.ForEach
静态扩展方法的以下过载:
public static ParallelLoopResult ForEach<TSource, TLocal>(
IEnumerable<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> taskBody,
Action<TLocal> localFinally
)
在您的特定示例中
线路:
() => 0, // method to initialize the local variable
只是一个lambda(匿名函数),它将返回常量整数零。此lambda作为localInit
参数传递给Parallel.ForEach
-由于lambda返回一个整数,它的类型为Func<int>
,编译器可以将类型TLocal
推断为int
(类似地,TSource
可以从作为参数source
传递的集合的类型推断)
然后将返回值(0)作为第三个参数(命名为subtotal
)传递给taskBody
Func
。此(0)用于身体循环的初始种子:
(j, loop, subtotal) =>
{
subtotal += nums[j]; //modify local variable (Bad idea, see comment)
return subtotal; // value to be passed to next iteration
}
第二个lambda(传递给taskBody
)被调用N次,其中N是TPL分区器分配给该任务的项数。
对第二个taskBody
lambda的每次后续调用都将传递subTotal
的新值,从而有效地计算该Task的运行中的部分总计。添加完分配给该任务的所有项目后,将再次调用第三个也是最后一个localFinally
函数参数,传递taskBody
返回的subtotal
的最终值。由于几个这样的任务将并行运行,因此还需要最后一步将所有部分总数相加为最终的"总计"。但是,由于多个并发任务(在不同的线程上)可能会争夺grandTotal
变量,因此以线程安全的方式对其进行更改是很重要的。
(我更改了MSDN变量的名称以使其更加清晰)
long grandTotal = 0;
Parallel.ForEach(nums, // source collection
() => 0, // method to initialize the local variable
(j, loop, subtotal) => // method invoked by the loop on each iteration
subtotal + nums[j], // value to be passed to next iteration subtotal
// The final value of subtotal is passed to the localFinally function parameter
(subtotal) => Interlocked.Add(ref grandTotal, subtotal)
在MS示例中,修改任务主体内的参数小计是一种糟糕的做法,并且是不必要的。即代码subtotal += nums[j]; return subtotal;
将更好地作为return subtotal + nums[j];
,其可以缩写为lambda简写投影(j, loop, subtotal) => subtotal + nums[j]
一般
Parallel.For/Parallel.ForEach的localInit / body / localFinally
重载允许每个任务运行一次初始化和清理代码,在任务执行taskBody
迭代之前和之后(分别)。
(注意,传递给并行For
/Foreach
的For范围/Enumerable将被划分为IEnumerable<>
的批,每个批将被分配一个Task)
在每个任务中,localInit
将被调用一次,body
代码将被重复调用,批量中每个项目调用一次(0..N
次),localFinally
将在完成时调用一次。
此外,您可以通过来自localInit Func
的通用TLocal
返回值传递任务持续时间所需的任何状态(即传递给taskBody
和localFinally
代理)-我在下面调用了这个变量taskLocals
。
"localInit"的常见用法:
- 创建和初始化循环体所需的昂贵资源,如数据库连接或web服务连接
- 保持任务本地变量保持(未控制的)运行汇总或集合
- 如果需要将多个对象从
localInit
返回到taskBody
和localFinally
,则可以使用强类型类Tuple<,,>
,或者,如果仅对localInit / taskBody / localFinally
使用lambda,则还可以通过匿名类传递数据。请注意,如果您使用localInit
的返回在多个任务之间共享一个引用类型,则需要考虑该对象的线程安全性——不变性更可取
"localFinally"操作的常见用法:
- 释放
taskLocals
中使用的IDisposables
等资源(例如数据库连接、文件句柄、web服务客户端等) - 将每个任务所做的工作聚合/组合/减少到共享变量中。这些共享变量将被争用,因此线程安全是一个问题:
- 例如,像整数这样的基元类型上的
Interlocked.Increment
- 写入操作将需要
lock
或类似 - 利用并发收集来节省时间和精力
- 例如,像整数这样的基元类型上的
taskBody
是循环操作的tight
部分,您需要对其进行性能优化。
这一切最好用一个有评论的例子来总结:
public void MyParallelizedMethod()
{
// Shared variable. Not thread safe
var itemCount = 0;
Parallel.For(myEnumerable,
// localInit - called once per Task.
() =>
{
// Local `task` variables have no contention
// since each Task can never run by multiple threads concurrently
var sqlConnection = new SqlConnection("connstring...");
sqlConnection.Open();
// This is the `task local` state we wish to carry for the duration of the task
return new
{
Conn = sqlConnection,
RunningTotal = 0
}
},
// Task Body. Invoked once per item in the batch assigned to this task
(item, loopState, taskLocals) =>
{
// ... Do some fancy Sql work here on our task's independent connection
using(var command = taskLocals.Conn.CreateCommand())
using(var reader = command.ExecuteReader(...))
{
if (reader.Read())
{
// No contention for `taskLocal`
taskLocals.RunningTotal += Convert.ToInt32(reader["countOfItems"]);
}
}
// The same type of our `taskLocal` param must be returned from the body
return taskLocals;
},
// LocalFinally called once per Task after body completes
// Also takes the taskLocal
(taskLocals) =>
{
// Any cleanup work on our Task Locals (as you would do in a `finally` scope)
if (taskLocals.Conn != null)
taskLocals.Conn.Dispose();
// Do any reduce / aggregate / synchronisation work.
// NB : There is contention here!
Interlocked.Add(ref itemCount, taskLocals.RunningTotal);
}
还有更多的例子:
每个任务的未控制字典示例
每个任务的数据库连接示例
作为@Honza Brestan答案的扩展。Parallel foreach将工作划分为任务的方式也很重要,它将把几个循环迭代分组为一个任务,因此在实践中,循环的每n次迭代调用一次localInit()
,可以同时启动多个组。
localInit
和localFinally
的目的是确保并行foreach循环可以将每个itteration的结果组合成一个结果,而无需在body
中指定锁语句。要做到这一点,您必须为要创建的值(localInit
)提供初始化,然后每个body
itteration都可以处理本地值,然后提供一种方法,以线程安全的方式组合每个组(localFinally
)的值。
如果同步任务不需要localInit,则可以使用lambda方法正常引用周围上下文中的值,而不会出现任何问题。有关使用localInit/Finally的更深入教程,请参阅C#中的线程(Parallel.For和Parallel.ForEach),并向下滚动到使用本地值优化,Joseph Albahari确实是我所有线程的goto源。
您可以在MSDN上的正确的Parallel.ForEach
重载中获得提示。
localInit委托为参与循环执行的每个线程调用一次,并为每个任务返回初始本地状态。这些初始状态被传递给每个任务的第一个主体调用。然后,每个后续的主体调用都会返回一个可能已修改的状态值,该值将传递给下一个主体调用。
在您的示例中,() => 0
是一个刚刚返回0
的委托,因此该值用于每个任务的第一次迭代。
从我的角度来看,是一个更简单的例子
class Program
{
class Person
{
public int Id { get; set; }
public string Name { get; set; }
public int Age { get; set; }
}
static List<Person> GetPerson() => new List<Person>()
{
new Person() { Id = 0, Name = "Artur", Age = 26 },
new Person() { Id = 1, Name = "Edward", Age = 30 },
new Person() { Id = 2, Name = "Krzysiek", Age = 67 },
new Person() { Id = 3, Name = "Piotr", Age = 23 },
new Person() { Id = 4, Name = "Adam", Age = 11 },
};
static void Main(string[] args)
{
List<Person> persons = GetPerson();
int ageTotal = 0;
Parallel.ForEach
(
persons,
() => 0,
(person, loopState, subtotal) => subtotal + person.Age,
(subtotal) => Interlocked.Add(ref ageTotal, subtotal)
);
Console.WriteLine($"Age total: {ageTotal}");
Console.ReadKey();
}
}