并行Linq查询优化

本文关键字:查询优化 Linq 并行 | 更新日期: 2023-09-27 18:22:45

一段时间以来,我一直在围绕没有副作用的方法构建代码,以便使用并行linq来加快速度。在这一过程中,我不止一次偶然发现懒惰的评估会让事情变得更糟而不是更好,我想知道是否有任何工具可以帮助优化并行linq查询。

我之所以这么问,是因为我最近通过修改一些方法并在某些关键位置添加AsParallel,重构了一些令人尴尬的并行代码。运行时间从2分钟下降到45秒,但从性能监视器上可以清楚地看到,在某些地方,CPU上的所有内核都没有得到充分利用。在几次错误启动后,我使用ToArray强制执行了一些查询,运行时间进一步下降到了16秒。减少代码的运行时间感觉很好,但也有点令人不安,因为不清楚在代码中哪里需要使用ToArray强制执行查询。等到最后一分钟才执行查询并不是最佳策略,但根本不清楚在代码中的哪些点需要强制执行一些子查询才能利用所有CPU核心。

事实上,我不知道如何正确地使用ToArray或其他强制执行linq计算以获得最大CPU利用率的方法。那么,有没有优化并行linq查询的通用指南和工具呢?

下面是一个伪代码示例:

var firstQuery = someDictionary.SelectMany(FirstTransformation);
var secondQuery = firstQuery.Select(SecondTransformation);
var thirdQuery = secondQuery.Select(ThirdTransformation).Where(SomeConditionCheck);
var finalQuery = thirdQuery.Select(FinalTransformation).Where(x => x != null);

FirstTransformationSecondTransformationThirdTransformation都是CPU界的,就复杂度而言,它们是一些3x3矩阵乘法和一些if分支。SomeConditionCheck相当于null检查。FinalTransformation是代码中CPU最密集的部分,因为它将执行一整串线平面相交,并检查这些相交的多边形包含,然后提取最靠近线上某个点的相交。

我不知道为什么我放置AsParallel的地方会像它那样减少代码的运行时间。我现在已经达到了当地的最低运行时间,但我不知道为什么。我偶然发现了它,真是运气太差了。如果你想知道AsParallel的位置是第一行和最后一行。将AsParallel放置在其他位置只会增加运行时间,有时最多可增加20秒。还有一个隐藏的ToArray隐藏在那里的第一行。

并行Linq查询优化

这里有几件事:

  1. PLINQ比未计数的IEnumerables更有效地并行化集合。如果你有一个数组,它会将数组长度除以你的CPU核心数量,并平均分配任务。但是,如果你有一个长度未知的IEnumerable,它会做一种愚蠢的指数上升类型的事情,任务会一次处理1、2、4、8等元素,直到它到达IEnumerable
  2. 通过并行化所有查询,您可以将工作分解为小块。如果您在N个元素上有M个并行查询,那么您最终会得到M*N个任务。与只并行化最后一个查询相比,这会带来更多的线程开销,在这种情况下,您最终只需要N个任务
  3. PLINQ在处理每个任务所花费的时间大致相同时效果最佳。这样它就可以将它们平均地分配到核心中。通过并行化具有不同性能行为的每个查询,您将有M*N个任务需要不同的时间,而PLINQ无法对它们进行最佳调度(因为它不提前知道每个任务可能需要多长时间)

因此,这里的总体指导原则是:如果可能的话,确保在开始之前已经有了一个数组,并且只在求值前的最后一个查询上放置AsParallel。因此,以下内容应该非常有效:

var firstQuery = someDictionary.SelectMany().ToArray().Select(FirstTransformation);
var secondQuery = firstQuery.Select(SecondTransformation);
var thirdQuery = secondQuery.Select(ThirdTransformation).AsParallel().Where(SomeConditionCheck).ToArray();
var finalQuery = thirdQuery.Select(FinalTransformation).AsParallel().Where(x => x != null);

如果没有看到实际的代码,几乎不可能判断。但是,作为一项一般指导原则,您应该考虑在复杂的数字运算过程中避免P/LINQ,因为委托和IEnumerable开销太高了。通过使用线程获得的速度很可能会被LINQ提供的方便抽象所消耗。

这里有一些代码,它确实计算了2个整数列表的和,做了一些int到float的比较,然后计算它的cos。使用LINQ的.Zip运算符可以很好地完成一些非常基本的事情。。。或者用for循环的老式方式。

在我的Haswell 8核心机器上用更新的ParallelLinq更新1

  • 林克0,95s
  • Linq并行0,19s
  • 优化0,45s
  • 优化的并行0,08s

更新1结束

  • LINQ 1,65s
  • 优化0,64s
  • 优化并行0,40s

由于IEnumerable的惰性和方法调用开销(我确实使用了发布模式x32 Windows 7,.NET 4双核),时差几乎是3倍。我曾尝试在LINQ版本中使用AsParallel,但它确实变慢了(2,3秒)。如果你是数据驱动的,你应该使用Parallel.For构造来获得良好的可伸缩性。IEnumerable本身就是一个糟糕的并行化候选者,因为

  • 你不知道自己有多少工作,直到最后才列举出来
  • 您不能进行急切的分块,因为您不知道IEnumerable返回下一个元素(可能是web服务调用或数组索引访问)的速度有多快

下面是一个代码示例来说明这一点。如果你想对裸金属进行更多的优化,你首先需要摆脱抽象,因为抽象确实会让每件物品花费太多。与非内联MoveNext()和Current方法调用相比,数组访问要便宜得多。

    class Program
    {
        static void Main(string[] args)
        {
            var A = new List<int>(Enumerable.Range(0, 10*1000*1000));
            var B = new List<int>(Enumerable.Range(0, 10*1000*1000));
            double[] Actual = UseLinq(A, B);
            double[] pActual = UseLinqParallel(A, B);
            var other = Optimized(A, B);
            var pother = OptimizedParallel(A, B);
        }
        private static double[] UseLinq(List<int> A, List<int> B)
        {
            var sw = Stopwatch.StartNew();
            var Merged = A.Zip(B, (a, b) => a + b);
            var Converted = A.Select(a => (float)a);
            var Result = Merged.Zip(Converted, (m, c) => Math.Cos((double)m / ((double)c + 1)));
            double[] Actual = Result.ToArray();
            sw.Stop();
            Console.WriteLine("Linq {0:F2}s", sw.Elapsed.TotalSeconds);
            return Actual;
        }
    private static double[] UseLinqParallel(List<int> A, List<int> B)
    {
        var sw = Stopwatch.StartNew();
        var x = A.AsParallel();
        var y = B.AsParallel();
        var Merged = x.Zip(y, (a, b) => a + b);
        var Converted = x.Select(a => (float)a);
        var Result = Merged.Zip(Converted, (m, c) => Math.Cos((double)m / ((double)c + 1)));
        double[] Actual = Result.ToArray();
        sw.Stop();
        Console.WriteLine("Linq Parallel {0:F2}s", sw.Elapsed.TotalSeconds);
        return Actual;
    }        
        private static double[] OptimizedParallel(List<int> A, List<int> B)
        {
            double[] result = new double[A.Count];
            var sw = Stopwatch.StartNew();
            Parallel.For(0, A.Count, (i) =>
            {
                var sum = A[i] + B[i];
                result[i] = Math.Cos((double)sum / ((double)((float)A[i]) + 1));
            });
            sw.Stop();
            Console.WriteLine("Optimized Parallel {0:F2}s", sw.Elapsed.TotalSeconds);
            return result;
        }
        private static double[] Optimized(List<int> A, List<int> B)
        {
            double[] result = new double[A.Count];
            var sw = Stopwatch.StartNew();
            for(int i=0;i<A.Count;i++)
            {
                var sum = A[i] + B[i];
                result[i] = Math.Cos((double)sum / ((double)((float)A[i]) + 1));
            }
            sw.Stop();
            Console.WriteLine("Optimized {0:F2}s", sw.Elapsed.TotalSeconds);
            return result;
        }
    }
}