并行ForEach的本地初始化是如何工作的

本文关键字:何工作 工作 ForEach 初始化 并行 | 更新日期: 2023-09-27 17:58:52

我不确定在Parallel.ForEach中使用本地init函数,如msdn文章中所述:http://msdn.microsoft.com/en-us/library/dd997393.aspx

Parallel.ForEach<int, long>(nums, // source collection
   () => 0, // method to initialize the local variable
   (j, loop, subtotal) => // method invoked by the loop on each iteration
   {
      subtotal += nums[j]; //modify local variable 
      return subtotal; // value to be passed to next iteration
   },...

()=>0如何初始化任何内容?变量的名称是什么?如何在循环逻辑中使用它?

并行ForEach的本地初始化是如何工作的

参考Parallel.ForEach静态扩展方法的以下过载:

public static ParallelLoopResult ForEach<TSource, TLocal>(
    IEnumerable<TSource> source,
    Func<TLocal> localInit,
    Func<TSource, ParallelLoopState, TLocal, TLocal> taskBody,
    Action<TLocal> localFinally
)

在您的特定示例中

线路:

() => 0, // method to initialize the local variable

只是一个lambda(匿名函数),它将返回常量整数零。此lambda作为localInit参数传递给Parallel.ForEach-由于lambda返回一个整数,它的类型为Func<int>,编译器可以将类型TLocal推断为int(类似地,TSource可以从作为参数source传递的集合的类型推断)

然后将返回值(0)作为第三个参数(命名为subtotal)传递给taskBody Func。此(0)用于身体循环的初始种子:

(j, loop, subtotal) =>
{
    subtotal += nums[j]; //modify local variable (Bad idea, see comment)
    return subtotal;     // value to be passed to next iteration
}

第二个lambda(传递给taskBody)被调用N次,其中N是TPL分区器分配给该任务的项数。

对第二个taskBody lambda的每次后续调用都将传递subTotal的新值,从而有效地计算该Task的运行中的部分总计。添加完分配给该任务的所有项目后,将再次调用第三个也是最后一个localFinally函数参数,传递taskBody返回的subtotal的最终值。由于几个这样的任务将并行运行,因此还需要最后一步将所有部分总数相加为最终的"总计"。但是,由于多个并发任务(在不同的线程上)可能会争夺grandTotal变量,因此以线程安全的方式对其进行更改是很重要的。

(我更改了MSDN变量的名称以使其更加清晰)

long grandTotal = 0;
Parallel.ForEach(nums,            // source collection
  () => 0,                        // method to initialize the local variable
  (j, loop, subtotal) =>          // method invoked by the loop on each iteration
     subtotal + nums[j],          // value to be passed to next iteration subtotal
  // The final value of subtotal is passed to the localFinally function parameter
  (subtotal) => Interlocked.Add(ref grandTotal, subtotal)

在MS示例中,修改任务主体内的参数小计是一种糟糕的做法,并且是不必要的。即代码subtotal += nums[j]; return subtotal;将更好地作为return subtotal + nums[j];,其可以缩写为lambda简写投影(j, loop, subtotal) => subtotal + nums[j]

一般

Parallel.For/Parallel.ForEach的localInit / body / localFinally重载允许每个任务运行一次初始化和清理代码,在任务执行taskBody迭代之前和之后(分别)。

(注意,传递给并行For/Foreach的For范围/Enumerable将被划分为IEnumerable<>的批,每个批将被分配一个Task)

每个任务中,localInit将被调用一次,body代码将被重复调用,批量中每个项目调用一次(0..N次),localFinally将在完成时调用一次。

此外,您可以通过来自localInit Func的通用TLocal返回值传递任务持续时间所需的任何状态(即传递给taskBodylocalFinally代理)-我在下面调用了这个变量taskLocals

"localInit"的常见用法:

  • 创建和初始化循环体所需的昂贵资源,如数据库连接或web服务连接
  • 保持任务本地变量保持(未控制的)运行汇总或集合
  • 如果需要将多个对象从localInit返回到taskBodylocalFinally,则可以使用强类型类Tuple<,,>,或者,如果仅对localInit / taskBody / localFinally使用lambda,则还可以通过匿名类传递数据。请注意,如果您使用localInit的返回在多个任务之间共享一个引用类型,则需要考虑该对象的线程安全性——不变性更可取

"localFinally"操作的常见用法:

  • 释放taskLocals中使用的IDisposables等资源(例如数据库连接、文件句柄、web服务客户端等)
  • 将每个任务所做的工作聚合/组合/减少到共享变量中。这些共享变量将被争用,因此线程安全是一个问题:
    • 例如,像整数这样的基元类型上的Interlocked.Increment
    • 写入操作将需要lock或类似
    • 利用并发收集来节省时间和精力

taskBody是循环操作的tight部分,您需要对其进行性能优化。

这一切最好用一个有评论的例子来总结:

public void MyParallelizedMethod()
{
    // Shared variable. Not thread safe
    var itemCount = 0; 
    Parallel.For(myEnumerable, 
    // localInit - called once per Task.
    () => 
    {
       // Local `task` variables have no contention 
       // since each Task can never run by multiple threads concurrently
       var sqlConnection = new SqlConnection("connstring...");
       sqlConnection.Open();
       // This is the `task local` state we wish to carry for the duration of the task
       return new 
       { 
          Conn = sqlConnection,
          RunningTotal = 0
       }
    },
    // Task Body. Invoked once per item in the batch assigned to this task
    (item, loopState, taskLocals) =>
    {
      // ... Do some fancy Sql work here on our task's independent connection
      using(var command = taskLocals.Conn.CreateCommand())
      using(var reader = command.ExecuteReader(...))
      {
        if (reader.Read())
        {
           // No contention for `taskLocal`
           taskLocals.RunningTotal += Convert.ToInt32(reader["countOfItems"]);
        }
      }
      // The same type of our `taskLocal` param must be returned from the body
      return taskLocals;
    },
    // LocalFinally called once per Task after body completes
    // Also takes the taskLocal
    (taskLocals) =>
    {
       // Any cleanup work on our Task Locals (as you would do in a `finally` scope)
       if (taskLocals.Conn != null)
         taskLocals.Conn.Dispose();
       // Do any reduce / aggregate / synchronisation work.
       // NB : There is contention here!
       Interlocked.Add(ref itemCount, taskLocals.RunningTotal);
    }

还有更多的例子:

每个任务的未控制字典示例

每个任务的数据库连接示例

作为@Honza Brestan答案的扩展。Parallel foreach将工作划分为任务的方式也很重要,它将把几个循环迭代分组为一个任务,因此在实践中,循环的每n次迭代调用一次localInit(),可以同时启动多个组。

localInitlocalFinally的目的是确保并行foreach循环可以将每个itteration的结果组合成一个结果,而无需在body中指定锁语句。要做到这一点,您必须为要创建的值(localInit)提供初始化,然后每个body itteration都可以处理本地值,然后提供一种方法,以线程安全的方式组合每个组(localFinally)的值。

如果同步任务不需要localInit,则可以使用lambda方法正常引用周围上下文中的值,而不会出现任何问题。有关使用localInit/Finally的更深入教程,请参阅C#中的线程(Parallel.For和Parallel.ForEach),并向下滚动到使用本地值优化,Joseph Albahari确实是我所有线程的goto源。

您可以在MSDN上的正确的Parallel.ForEach重载中获得提示。

localInit委托为参与循环执行的每个线程调用一次,并为每个任务返回初始本地状态。这些初始状态被传递给每个任务的第一个主体调用。然后,每个后续的主体调用都会返回一个可能已修改的状态值,该值将传递给下一个主体调用。

在您的示例中,() => 0是一个刚刚返回0的委托,因此该值用于每个任务的第一次迭代。

从我的角度来看,是一个更简单的例子

class Program
{
    class Person
    {
        public int Id { get; set; }
        public string Name { get; set; }
        public int Age { get; set; }
    }
    static List<Person> GetPerson() => new List<Person>()
    {
        new Person() { Id = 0, Name = "Artur", Age = 26 },
        new Person() { Id = 1, Name = "Edward", Age = 30 },
        new Person() { Id = 2, Name = "Krzysiek", Age = 67 },
        new Person() { Id = 3, Name = "Piotr", Age = 23 },
        new Person() { Id = 4, Name = "Adam", Age = 11 },
    };
    static void Main(string[] args)
    {
        List<Person> persons = GetPerson();
        int ageTotal = 0;
        Parallel.ForEach
        (
            persons,
            () => 0,
            (person, loopState, subtotal) => subtotal + person.Age,
            (subtotal) => Interlocked.Add(ref ageTotal, subtotal)
        );
        Console.WriteLine($"Age total: {ageTotal}");
        Console.ReadKey();
    }
}