快速的正态分布双打

本文关键字:正态分布 | 更新日期: 2023-09-27 18:22:30

我们有一个对象数组。每个对象都有两个值。数组应按此值排序。阵列参数为:

  1. 范围为1-10000个元素。大部分时间为100-5000。>10000真的不太可能
  2. 值的分布接近正态
  3. 值只插入一次,之后不会更改(对几乎排序的数组没有重新排序)
  4. 有很多这样的数据样本

现在我们使用OrderBy并执行如下操作:

public class Item
{
    double Value;
    //... some else data fields
}
List<Item> items;           // fill items
items.Sort(p=>p.Value);  // sort

众所周知:

  1. List.Sort(与Array.Sort相同)是Quick Sort算法的一种实现
  2. 快速排序最适用于双精度的均匀分布
  3. 排序的OrderBy实现看起来比List.sort更糟糕

但基准测试仍然表明,这种排序占用了我们95%的软件处理时间。

对于这种特殊情况,是否有更快的排序实现?

快速的正态分布双打

评估排序算法的问题在于有许多因素会影响结果。我不会相信你提供的网站,因为它可能更像是对javascript引擎、可视化和javascript实现的基准测试,而不是实际的排序算法。

堆排序在理论上有很好的特性,但不能充分利用现代CPU优化。

QuickSort理论上更糟,但常见的技巧,如median-of-3和median-of-9枢轴元素,使坏情况变得不太可能,并且通过线性处理阵列,CPU可以很好地优化它。

当您不需要就地排序时,MergeSort是很好的。使用它并针对预排序和几乎预排序的数据对其进行优化并非易事,但您可能会了解Python和Java7使用的Tim排序。

没有一般的答案,比如"快速排序对高斯分布数据不利"。理论和实践之间存在着巨大的差距。理论上,插入排序和快速排序比堆排序更糟糕。事实上,在大多数情况下,一个优化良好的快速排序是很难击败的,特别是因为它很容易优化,并从CPU缓存中受益。Tim Sort不是一个简单的合并者。它实际上是与InsertionSort的混合,以针对对象已排序运行的常见情况进行优化。

其次,这应该是相当明显的,没有一个提到的排序算法实际计算两个对象的差异。因此,任何不产生大量重复的分布在他们看来都是一样的。他们看到的只是"小于、等于、大于"。所以分布之间唯一的区别就是有多少个对象是相等的!事实上,只有桶排序算法(例如基数排序)才应该关心对象分布,因为它们使用的是<=>比较之外的实际值。

对于您的特殊情况,您需要检查您的列表是如何组织的。链表真的不利于排序。事实上,如果我没有记错的话,Java会将几乎所有集合转换为本地数组进行排序,然后重新构建集合。其次,p=>p.Value的概念很不错,但也可能会付出相当大的代价。这可能会导致创建、管理和垃圾收集各种附加对象。

您应该尝试的第一件事是检查例如,完整比较器是否比lambda语法概念更快查看内存管理。最有可能的是,这就是实际工作发生的地方,不必要地复制和转换替身。

例如,C#可能会为数据集"double->index"构建一个反向映射,然后对该数组进行排序,然后使用映射对数据进行排序。如果lambda函数非常昂贵,并且只应该计算一次,那么这是很好的。

由于O(n^2)的努力和5000元素的向量,因此值得尝试

  1. 将您的列表转换为纯双数组
  2. 使用一些专门的排序库对数组进行排序
  3. 从排序中获取排序顺序索引,然后
  4. 根据索引重新排列列表中的项目

这是可行的,因为它只增加了O(2n)的工作量,因此如果由于更快的比较,开销小于增益,它可能是可以忽略的,并且会得到回报。

如果我有时间的话,我会在这里发布一个例子。

@编辑:我已经测试了这个演示(以及我自己的兴趣)。下面的测试将List.Sort()与copy-Sort-copy-back方法进行比较。后期是用ILNumerics快速排序完成的。两个版本在所有长度上都是相继运行的(意味着:不是交错)。

免责声明:这只是为了得到一个粗略的概述,如果方法会支付的。在我的电脑(酷睿i5,2.4 GHz,3MB L3数据缓存)上,它确实如此。但盈亏平衡点很难确定。此外,与往常一样,对于这种快速而肮脏的性能测量,一大堆影响都被忽略了。可能最重要的是:由于在实际生产环境中可能不需要多个拷贝而导致的缓存问题。

代码:

namespace ConsoleApplication1 {
unsafe class Program : ILNumerics.ILMath {
static void Main(string[] args) {
    int numItems = 0, repet = 20000; 
    Stopwatch sw01 = new Stopwatch();
    // results are collected in a dictionary: 
    // key: list length
    // value: tuple with times taken by ILNumerics and List.Sort()
    var results = new Dictionary<int, Tuple<float,float>>();
    // the comparer used for List.Sort() see below 
    ItemComparer comparer = new ItemComparer();
    // run test for copy-sort-copy back via ILNumerics
    for (numItems = 500; numItems < 50000; numItems = (int)(numItems * 1.3)) {
        Console.Write("'r measuring: {0}", numItems);  
        long ms = 0;
        List<Item> a = makeData(numItems);
        for (int i = 0; i < repet; i++) {
            sw01.Restart();
            List<Item> b1 = fastSort(a);
            sw01.Stop();
            ms += sw01.ElapsedMilliseconds;
        }
        results.Add(numItems,new Tuple<float,float>((float)ms / repet, 0f)); 
    }
    // run test using the straightforward approach, List.Sort(IComparer)
    for (numItems = 500; numItems < 50000; numItems = (int)(numItems * 1.3)) {
        Console.Write("'r measuring: {0}", numItems);  
        List<Item> a = makeData(numItems);
        long ms = 0;
        for (int i = 0; i < repet; i++) {
            List<Item> copyList = new List<Item>(a);
            sw01.Restart();
            copyList.Sort(comparer);
            sw01.Stop();
            ms += sw01.ElapsedMilliseconds;
        }
        results[numItems] = new Tuple<float, float>(results[numItems].Item1, (float)ms / repet); 
    }
    // Print results
    Console.Clear(); 
    foreach (var v in results) 
        Console.WriteLine("Length: {0} | ILNumerics/CLR: {1} / {2} ms", v.Key, v.Value.Item1, v.Value.Item2);
    Console.ReadKey(); 
}
public class Item {
    public double Value;
    //... some else data fields
}
public class ItemComparer : Comparer<Item> {
    public override int Compare(Item x, Item y) {
        return (x.Value > y.Value)  ? 1 
             : (x.Value == y.Value) ? 0 : -1;
    }
}

public static List<Item> makeData(int n) {
    List<Item> ret = new List<Item>(n); 
    using (ILScope.Enter()) {
        ILArray<double> A = rand(1,n);
        double[] values = A.GetArrayForRead(); 
        for (int i = 0; i < n; i++) {
            ret.Add(new Item() { Value = values[i] }); 
        }
    }
    return ret; 
}
public static List<Item> fastSort(List<Item> unsorted) {
    //double [] values = unsorted.ConvertAll<double>(item => item.Value).ToArray(); 
    //// maybe more efficient? safes O(n) run 
    //double[] values = new double[unsorted.Count];
    //for (int i = 0; i < values.Length; i++) {
    //    values[i] = unsorted[i].Value;
    //}
    using (ILScope.Enter()) {
        // convert incoming
        ILArray<double> doubles = zeros(unsorted.Count);
        double[] doublesArr = doubles.GetArrayForWrite();
        for (int i = 0; i < doubles.Length; i++) {
            doublesArr[i] = unsorted[i].Value;
        }
        // do fast sort 
        ILArray<double> indices = empty();
        doubles = sort(doubles, Indices: indices);
        // convert outgoing
        List<Item> ret = new List<Item>(unsorted.Count); 
        foreach (double i in indices) ret.Add(unsorted[(int)i]); 
        return ret; 
    }
}
}
}

这给出了以下输出:

Length: 500 | ILNumerics / List.Sort: 0,00395 / 0,0001 ms
Length: 650 | ILNumerics / List.Sort: 0,0003 / 0,0001 ms
Length: 845 | ILNumerics / List.Sort: 0,00035 / 0,0003 ms
Length: 1098 | ILNumerics / List.Sort: 0,0003 / 0,00015 ms
Length: 1427 | ILNumerics / List.Sort: 0,0005 / 0,00055 ms
Length: 1855 | ILNumerics / List.Sort: 0,00195 / 0,00055 ms
Length: 2000 | ILNumerics / List.Sort: 0,00535 / 0,0006 ms
Length: 2600 | ILNumerics / List.Sort: 0,0037 / 0,00295 ms
Length: 3380 | ILNumerics / List.Sort: 0,00515 / 0,0364 ms
Length: 4394 | ILNumerics / List.Sort: 0,0051 / 1,0015 ms
Length: 4500 | ILNumerics / List.Sort: 0,1136 / 1,0057 ms
Length: 5850 | ILNumerics / List.Sort: 0,2871 / 1,0047 ms
Length: 7605 | ILNumerics / List.Sort: 0,5015 / 2,0049 ms
Length: 9886 | ILNumerics / List.Sort: 1,1164 / 2,0793 ms
Length: 12851 | ILNumerics / List.Sort: 1,4236 / 3,6335 ms
Length: 16706 | ILNumerics / List.Sort: 1,6202 / 4,9506 ms
Length: 21717 | ILNumerics / List.Sort: 2,3417 / 6,1871 ms
Length: 28232 | ILNumerics / List.Sort: 3,4038 / 8,7888 ms
Length: 36701 | ILNumerics / List.Sort: 4,4406 / 12,1311 ms
Length: 47711 | ILNumerics / List.Sort: 5,7884 / 16,1002 ms

在这里,盈亏平衡似乎在4000个元素左右。通过复制排序复制方法,较大的数组总是以大约3的倍数更快地排序。我想,对于较小的阵列,它可能会付出代价,也可能不会。这里收集的数字是不可靠的。我认为,对于小列表,排序时间被一些其他问题所掩盖,比如内存管理(GC)等等。也许这里有人有更多的想法来解释这一点。

同样奇怪的是List的执行时间跳跃。排序长度为4000项。不知道List.Sort是否切换到另一个(更糟糕的)实现?

关于这个问题,复制项目,在普通数组中排序,并在需要时将其复制回似乎是值得的。根据您的硬件,盈亏平衡可能会上升或下降。所以一如既往:介绍您的实现!

一种解决方案可以是以0.001(或任何任意值p)的步长创建从0到1的仓。请注意,每个仓中的预期数量为p*N。现在,对于数组中的每个数字,计算累积概率(-无穷大的累积概率为0,0为0.5,无穷大为1.0),并将该数字放入相应的bin中。使用您喜欢的排序算法分别对每个垃圾箱进行排序,并合并结果。如果选择p使得p*n=k(k是常数),则在最佳和平均情况下,该算法为O(Nlogk)。