AsParallel扩展的实际工作方式

本文关键字:方式 实际工作 扩展 AsParallel | 更新日期: 2023-09-27 18:28:41

所以主题就是问题。

我得到的方法AsParallel返回使用相同LINQ关键字的包装器ParallelQuery<TSource>,但来自System.Linq.ParallelEnumerable而不是System.Linq.Enumerable

这很清楚,但当我查看反编译的源代码时,我不明白它是如何工作的。

让我们从一个最简单的扩展开始:Sum()方法。代码:

[__DynamicallyInvokable]
public static int Sum(this ParallelQuery<int> source)
{
  if (source == null)
    throw new ArgumentNullException("source");
  else
    return new IntSumAggregationOperator((IEnumerable<int>) source).Aggregate();
}

很明显,让我们转到Aggregate()方法。它是InternalAggregate方法的包装器,用于捕获一些异常。现在让我们来看看。

protected override int InternalAggregate(ref Exception singularExceptionToThrow)
{
  using (IEnumerator<int> enumerator = this.GetEnumerator(new ParallelMergeOptions?(ParallelMergeOptions.FullyBuffered), true))
  {
    int num = 0;
    while (enumerator.MoveNext())
      checked { num += enumerator.Current; }
    return num;
  }
}

问题是:它是如何运作的?我看不到一个变量的并发安全性,它被许多线程修改,我们只看到迭代器和求和。它是魔术枚举器吗?或者它是如何工作的?GetEnumerator()返回QueryOpeningEnumerator<TOutput>,但是它的代码太复杂了。

AsParallel扩展的实际工作方式

在我的第二次PLINQ攻击中,我终于找到了答案。而且很清楚。问题是枚举器并不简单。这是一个特殊的multithreading。那么它是如何工作的呢?答案是enumerator不会返回source的下一个值,而是返回下一个分区的总和。因此,当在enumerator.OpenQuery方法中的enumerator.MoveNext内部执行实际求和工作时,此代码仅执行2,4,6,8…次(基于Environment.ProcessorCount)。

因此,TPL显然对源进行了可枚举的分区,然后独立地对每个分区求和,然后对这个求和进行处理,参见IntSumAggregationOperatorEnumerator<TKey>。这里没有魔法,只能更深一层。

Sum运算符在单个线程中聚合所有值。这里没有多线程。诀窍是多线程正在其他地方发生。

PLINQ Sum方法可以处理PLINQ可枚举对象。这些可枚举对象可以使用其他构造(如where)来构建,这些构造允许在多个线程上处理集合。

Sum运算符始终是链中的最后一个运算符。尽管可以在多个线程上处理这个和,但TPL团队可能发现这对性能产生了负面影响,这是合理的,因为这种方法唯一要做的就是简单的整数加法。

因此,该方法处理来自其他线程的所有可用结果,并在单个线程上处理它们并返回该值。真正的诀窍在于其他PLINQ扩展方法。

protected override int InternalAggregate(ref Exception singularExceptionToThrow)
{
  using (IEnumerator<int> enumerator = this.GetEnumerator(new ParallelMergeOptions?    (ParallelMergeOptions.FullyBuffered), true))
  {
    int num = 0;
    while (enumerator.MoveNext())
      checked { num += enumerator.Current; }
    return num;
  }
}

这段代码不会并行执行,而会在内部按顺序执行。

试试这个

        List<int> list = new List<int>();
        int num = 0;
        Parallel.ForEach(list, (item) =>
            {
                checked { num += item; }
            });

内部操作将分布在ThreadPool上,当处理完所有项目时,ForEach语句将完成。

这里你需要线程安全:

        List<int> list = new List<int>();
        int num = 0;
        Parallel.ForEach(list, (item) =>
            {
                Interlocked.Add(ref num, item);
            });