两个排序整数数组的快速交集

本文关键字:数组 整数 两个 排序 | 更新日期: 2023-09-27 18:36:03

我需要找到两个排序整数数组的交集并且做得非常快。

现在,我正在使用以下代码:

int i = 0, j = 0;
while (i < arr1.Count && j < arr2.Count)
{
    if (arr1[i] < arr2[j])
    {
        i++;
    }
    else
    {
        if (arr2[j] < arr1[i])
        {
            j++;
        }
        else 
        {
            intersect.Add(arr2[j]);
            j++;
            i++;
        }
    }
}

不幸的是,完成所有工作可能需要几个小时。

如何做得更快?我找到了这篇使用 SIMD 指令的文章。是否可以在 .NET 中使用 SIMD?

你怎么看:

http://docs.go-mono.com/index.aspx?link=N:Mono.Simd 单声道

http://netasm.codeplex.com/NetASM(将 asm 代码注入托管)

和类似 http://www.atrevido.net/blog/PermaLink.aspx?guid=ac03f447-d487-45a6-8119-dc4fa1e932e1 的东西

 

编辑:

当我说数千时,我的意思是跟随(在代码中)

for(var i=0;i<arrCollection1.Count-1;i++)
{
    for(var j=i+1;j<arrCollection2.Count;j++)
    {
        Intersect(arrCollection1[i],arrCollection2[j])  
    }
}

两个排序整数数组的快速交集

更新

我得到的最快的是 200 毫秒,数组大小为 10mil,不安全版本(最后一段代码)。

我做的测试:

var arr1 = new int[10000000];
var arr2 = new int[10000000];
for (var i = 0; i < 10000000; i++)
{
    arr1[i] = i;
    arr2[i] = i * 2;
}
var sw = Stopwatch.StartNew();
var result = arr1.IntersectSorted(arr2);
sw.Stop();
Console.WriteLine(sw.Elapsed); // 00:00:00.1926156

全文:

我已经测试了各种方法,发现这非常好:

public static List<int> IntersectSorted(this int[] source, int[] target)
{
    // Set initial capacity to a "full-intersection" size
    // This prevents multiple re-allocations
    var ints = new List<int>(Math.Min(source.Length, target.Length));
    var i = 0;
    var j = 0;
    while (i < source.Length && j < target.Length)
    {
        // Compare only once and let compiler optimize the switch-case
        switch (source[i].CompareTo(target[j]))
        {
            case -1:
                i++;
                // Saves us a JMP instruction
                continue;
            case 1:
                j++;
                // Saves us a JMP instruction
                continue;
            default:
                ints.Add(source[i++]);
                j++;
                // Saves us a JMP instruction
                continue;
        }
    }
    // Free unused memory (sets capacity to actual count)
    ints.TrimExcess();
    return ints;
}

为了进一步改进,您可以删除 ints.TrimExcess(); ,这也会产生很大的不同,但您应该考虑是否需要该内存。

此外,如果您知道可能会中断使用交集的循环,并且不必将结果作为数组/列表,则应将实现更改为迭代器:

public static IEnumerable<int> IntersectSorted(this int[] source, int[] target)
{
    var i = 0;
    var j = 0;
    while (i < source.Length && j < target.Length)
    {
        // Compare only once and let compiler optimize the switch-case
        switch (source[i].CompareTo(target[j]))
        {
            case -1:
                i++;
                // Saves us a JMP instruction
                continue;
            case 1:
                j++;
                // Saves us a JMP instruction
                continue;
            default:
                yield return source[i++];
                j++;
                // Saves us a JMP instruction
                continue;
        }
    }
}

另一个改进是使用不安全的代码:

public static unsafe List<int> IntersectSorted(this int[] source, int[] target)
{
    var ints = new List<int>(Math.Min(source.Length, target.Length));
    fixed (int* ptSrc = source)
    {
        var maxSrcAdr = ptSrc + source.Length;
        fixed (int* ptTar = target)
        {
            var maxTarAdr = ptTar + target.Length;
            var currSrc = ptSrc;
            var currTar = ptTar;
            while (currSrc < maxSrcAdr && currTar < maxTarAdr)
            {
                switch ((*currSrc).CompareTo(*currTar))
                {
                    case -1:
                        currSrc++;
                        continue;
                    case 1:
                        currTar++;
                        continue;
                    default:
                        ints.Add(*currSrc);
                        currSrc++;
                        currTar++;
                        continue;
                }
            }
        }
    }
    ints.TrimExcess();
    return ints;
}

综上所述,最大的性能打击是在if-else's。将其变成开关盒产生了巨大的差异(大约快 2 倍)。

你有没有尝试过这样简单的事情:

var a = Enumerable.Range(1, int.MaxValue/100).ToList();
var b = Enumerable.Range(50, int.MaxValue/100 - 50).ToList();
//var c = a.Intersect(b).ToList();
List<int> c = new List<int>();
var t1 = DateTime.Now;
foreach (var item in a)
{
    if (b.BinarySearch(item) >= 0)
        c.Add(item);
}
var t2 = DateTime.Now;
var tres = t2 - t1;
这段代码接受 1 个包含 21,474,

836 个元素的数组,另一个包含 21,474,786 个元素

如果我使用var c = a.Intersect(b).ToList();我会得到OutOfMemoryException

结果乘积将是 461,167,507,485,096 次迭代,使用嵌套 foreach

但是使用这个简单的代码,交集发生在 TotalSeconds = 7.3960529(使用一个内核)中

现在我仍然不满意,所以我试图通过并行打破它来提高性能,一旦我完成我会发布它

Yorye Nathan 给了我两个数组与最后一个"不安全代码"方法的最快交集。不幸的是,这对我来说仍然太慢了,我需要进行数组交集的组合,最多可以组合 2^32 个,几乎不是吗?我进行了以下修改和调整,时间缩短了 2.6 倍。您需要在之前进行一些预优化,以确保您可以以某种方式进行优化。我只使用索引而不是实际对象或 id 或其他一些抽象比较。所以,通过示例,如果你必须像这样与大数相交

到达1: 103344, 234566, 789900, 1947890,到达 2: 150034, 234566, 845465, 23849854

将所有内容放入和阵列中

到达1: 103344, 234566, 789900, 1947890, 150034, 845465,23849854

并使用结果数组的有序索引进行交

到达1索引: 0, 1, 2, 3到达2索引: 1, 4, 5, 6

现在我们有更小的数字,我们可以用他们构建一些其他漂亮的数组。在从 Yorye 那里获取该方法后,我做了什么,我采用了 Arr2Index 并将其扩展到理论上的布尔数组,实际上扩展到字节数组,因为内存大小的含义,如下所示:

到达2索引检查:

0, 1, 0, 0, 1, 1 ,1

这或多或少是一个字典,它告诉我第二个数组是否包含它的任何索引。下一步我没有使用内存分配,这也需要时间,而是在调用方法之前预先创建了结果数组,因此在查找组合的过程中,我从未实例化任何内容。当然,您必须单独处理此数组的长度,因此可能需要将其存储在某个地方。

最后代码如下所示:

    public static unsafe int IntersectSorted2(int[] arr1, byte[] arr2Check, int[] result)
    {
        int length;
        fixed (int* pArr1 = arr1, pResult = result)
        fixed (byte* pArr2Check = arr2Check)
        {
            int* maxArr1Adr = pArr1 + arr1.Length;
            int* arr1Value = pArr1;
            int* resultValue = pResult;
            while (arr1Value < maxArr1Adr)
            {
                if (*(pArr2Check + *arr1Value) == 1)
                {
                    *resultValue = *arr1Value;
                    resultValue++;
                }
                arr1Value++;
            }
            length = (int)(resultValue - pResult);
        }
        return length;
    }
您可以

看到函数返回的结果数组大小,然后您可以按照自己的意愿进行操作(调整大小,保留它)。显然,结果数组必须至少具有 arr1 和 arr2 的最小大小。

最大的改进是我只遍历第一个数组,最好比第二个数组小,这样你的迭代次数就会更少。更少的迭代意味着更少的 CPU 周期,对吗?

所以这是两个有序数组的真正快速交集,如果你需要一个真正的高性能;)。

arrCollection1arrCollection2 是整数数组的集合吗?在这种情况下,您应该通过从i+1而不是0启动第二个循环来获得一些显着的改进

C# 不支持 SIMD。 此外,我还没有弄清楚为什么,使用 SSE 的 DLL 在从 C# 调用时并不比非 SSE 等效函数快。 此外,我所知道的所有 SIMD 扩展都不适用于分支,即您的"if"语句。

如果使用的是 .net 4.0,则可以使用并行 For 来提高速度(如果有多个内核)。 否则,如果你的 .net 3.5 或更低版本,则可以编写多线程版本。

这是一个类似于你的方法:

    IList<int> intersect(int[] arr1, int[] arr2)
    {
        IList<int> intersect = new List<int>();
        int i = 0, j = 0;
        int iMax = arr1.Length - 1, jMax = arr2.Length - 1;
        while (i < iMax && j < jMax)
        {
            while (i < iMax && arr1[i] < arr2[j]) i++;
            if (arr1[i] == arr2[j]) intersect.Add(arr1[i]);
            while (i < iMax && arr1[i] == arr2[j]) i++; //prevent reduntant entries
            while (j < jMax && arr2[j] < arr1[i]) j++;
            if (arr1[i] == arr2[j]) intersect.Add(arr1[i]);
            while (j < jMax && arr2[j] == arr1[i]) j++; //prevent redundant entries
        }
        return intersect;
    }

这还可以防止任何条目出现两次。 有 2 个大小为 1000 万的排序数组,它在大约一秒钟内完成。 如果使用数组,编译器应该删除数组边界检查。For 语句中的长度,我不知道这是否适用于一段时间语句。