两个排序整数数组的快速交集
本文关键字:数组 整数 两个 排序 | 更新日期: 2023-09-27 18:36:03
我需要找到两个排序整数数组的交集并且做得非常快。
现在,我正在使用以下代码:
int i = 0, j = 0;
while (i < arr1.Count && j < arr2.Count)
{
if (arr1[i] < arr2[j])
{
i++;
}
else
{
if (arr2[j] < arr1[i])
{
j++;
}
else
{
intersect.Add(arr2[j]);
j++;
i++;
}
}
}
不幸的是,完成所有工作可能需要几个小时。
如何做得更快?我找到了这篇使用 SIMD 指令的文章。是否可以在 .NET 中使用 SIMD?
你怎么看:
http://docs.go-mono.com/index.aspx?link=N:Mono.Simd 单声道
http://netasm.codeplex.com/NetASM(将 asm 代码注入托管)
和类似 http://www.atrevido.net/blog/PermaLink.aspx?guid=ac03f447-d487-45a6-8119-dc4fa1e932e1 的东西
编辑:
当我说数千时,我的意思是跟随(在代码中)
for(var i=0;i<arrCollection1.Count-1;i++)
{
for(var j=i+1;j<arrCollection2.Count;j++)
{
Intersect(arrCollection1[i],arrCollection2[j])
}
}
更新
我得到的最快的是 200 毫秒,数组大小为 10mil,不安全版本(最后一段代码)。
我做的测试:
var arr1 = new int[10000000];
var arr2 = new int[10000000];
for (var i = 0; i < 10000000; i++)
{
arr1[i] = i;
arr2[i] = i * 2;
}
var sw = Stopwatch.StartNew();
var result = arr1.IntersectSorted(arr2);
sw.Stop();
Console.WriteLine(sw.Elapsed); // 00:00:00.1926156
全文:
我已经测试了各种方法,发现这非常好:
public static List<int> IntersectSorted(this int[] source, int[] target)
{
// Set initial capacity to a "full-intersection" size
// This prevents multiple re-allocations
var ints = new List<int>(Math.Min(source.Length, target.Length));
var i = 0;
var j = 0;
while (i < source.Length && j < target.Length)
{
// Compare only once and let compiler optimize the switch-case
switch (source[i].CompareTo(target[j]))
{
case -1:
i++;
// Saves us a JMP instruction
continue;
case 1:
j++;
// Saves us a JMP instruction
continue;
default:
ints.Add(source[i++]);
j++;
// Saves us a JMP instruction
continue;
}
}
// Free unused memory (sets capacity to actual count)
ints.TrimExcess();
return ints;
}
为了进一步改进,您可以删除 ints.TrimExcess();
,这也会产生很大的不同,但您应该考虑是否需要该内存。
此外,如果您知道可能会中断使用交集的循环,并且不必将结果作为数组/列表,则应将实现更改为迭代器:
public static IEnumerable<int> IntersectSorted(this int[] source, int[] target)
{
var i = 0;
var j = 0;
while (i < source.Length && j < target.Length)
{
// Compare only once and let compiler optimize the switch-case
switch (source[i].CompareTo(target[j]))
{
case -1:
i++;
// Saves us a JMP instruction
continue;
case 1:
j++;
// Saves us a JMP instruction
continue;
default:
yield return source[i++];
j++;
// Saves us a JMP instruction
continue;
}
}
}
另一个改进是使用不安全的代码:
public static unsafe List<int> IntersectSorted(this int[] source, int[] target)
{
var ints = new List<int>(Math.Min(source.Length, target.Length));
fixed (int* ptSrc = source)
{
var maxSrcAdr = ptSrc + source.Length;
fixed (int* ptTar = target)
{
var maxTarAdr = ptTar + target.Length;
var currSrc = ptSrc;
var currTar = ptTar;
while (currSrc < maxSrcAdr && currTar < maxTarAdr)
{
switch ((*currSrc).CompareTo(*currTar))
{
case -1:
currSrc++;
continue;
case 1:
currTar++;
continue;
default:
ints.Add(*currSrc);
currSrc++;
currTar++;
continue;
}
}
}
}
ints.TrimExcess();
return ints;
}
综上所述,最大的性能打击是在if-else's。将其变成开关盒产生了巨大的差异(大约快 2 倍)。
你有没有尝试过这样简单的事情:
var a = Enumerable.Range(1, int.MaxValue/100).ToList();
var b = Enumerable.Range(50, int.MaxValue/100 - 50).ToList();
//var c = a.Intersect(b).ToList();
List<int> c = new List<int>();
var t1 = DateTime.Now;
foreach (var item in a)
{
if (b.BinarySearch(item) >= 0)
c.Add(item);
}
var t2 = DateTime.Now;
var tres = t2 - t1;
这段代码接受 1 个包含 21,474,836 个元素的数组,另一个包含 21,474,786 个元素
如果我使用var c = a.Intersect(b).ToList();
我会得到OutOfMemoryException
结果乘积将是 461,167,507,485,096 次迭代,使用嵌套 foreach
但是使用这个简单的代码,交集发生在 TotalSeconds = 7.3960529(使用一个内核)中
现在我仍然不满意,所以我试图通过并行打破它来提高性能,一旦我完成我会发布它
Yorye Nathan 给了我两个数组与最后一个"不安全代码"方法的最快交集。不幸的是,这对我来说仍然太慢了,我需要进行数组交集的组合,最多可以组合 2^32 个,几乎不是吗?我进行了以下修改和调整,时间缩短了 2.6 倍。您需要在之前进行一些预优化,以确保您可以以某种方式进行优化。我只使用索引而不是实际对象或 id 或其他一些抽象比较。所以,通过示例,如果你必须像这样与大数相交
到达1: 103344, 234566, 789900, 1947890,到达 2: 150034, 234566, 845465, 23849854
将所有内容放入和阵列中
到达1: 103344, 234566, 789900, 1947890, 150034, 845465,23849854
并使用结果数组的有序索引进行交
集到达1索引: 0, 1, 2, 3到达2索引: 1, 4, 5, 6
现在我们有更小的数字,我们可以用他们构建一些其他漂亮的数组。在从 Yorye 那里获取该方法后,我做了什么,我采用了 Arr2Index 并将其扩展到理论上的布尔数组,实际上扩展到字节数组,因为内存大小的含义,如下所示:
到达2索引检查:
0, 1, 0, 0, 1, 1 ,1这或多或少是一个字典,它告诉我第二个数组是否包含它的任何索引。下一步我没有使用内存分配,这也需要时间,而是在调用方法之前预先创建了结果数组,因此在查找组合的过程中,我从未实例化任何内容。当然,您必须单独处理此数组的长度,因此可能需要将其存储在某个地方。
最后代码如下所示:
public static unsafe int IntersectSorted2(int[] arr1, byte[] arr2Check, int[] result)
{
int length;
fixed (int* pArr1 = arr1, pResult = result)
fixed (byte* pArr2Check = arr2Check)
{
int* maxArr1Adr = pArr1 + arr1.Length;
int* arr1Value = pArr1;
int* resultValue = pResult;
while (arr1Value < maxArr1Adr)
{
if (*(pArr2Check + *arr1Value) == 1)
{
*resultValue = *arr1Value;
resultValue++;
}
arr1Value++;
}
length = (int)(resultValue - pResult);
}
return length;
}
您可以看到函数返回的结果数组大小,然后您可以按照自己的意愿进行操作(调整大小,保留它)。显然,结果数组必须至少具有 arr1 和 arr2 的最小大小。
最大的改进是我只遍历第一个数组,最好比第二个数组小,这样你的迭代次数就会更少。更少的迭代意味着更少的 CPU 周期,对吗?
所以这是两个有序数组的真正快速交集,如果你需要一个真正的高性能;)。
arrCollection1
和 arrCollection2
是整数数组的集合吗?在这种情况下,您应该通过从i+1
而不是0
启动第二个循环来获得一些显着的改进
C# 不支持 SIMD。 此外,我还没有弄清楚为什么,使用 SSE 的 DLL 在从 C# 调用时并不比非 SSE 等效函数快。 此外,我所知道的所有 SIMD 扩展都不适用于分支,即您的"if"语句。
如果使用的是 .net 4.0,则可以使用并行 For 来提高速度(如果有多个内核)。 否则,如果你的 .net 3.5 或更低版本,则可以编写多线程版本。
这是一个类似于你的方法:
IList<int> intersect(int[] arr1, int[] arr2)
{
IList<int> intersect = new List<int>();
int i = 0, j = 0;
int iMax = arr1.Length - 1, jMax = arr2.Length - 1;
while (i < iMax && j < jMax)
{
while (i < iMax && arr1[i] < arr2[j]) i++;
if (arr1[i] == arr2[j]) intersect.Add(arr1[i]);
while (i < iMax && arr1[i] == arr2[j]) i++; //prevent reduntant entries
while (j < jMax && arr2[j] < arr1[i]) j++;
if (arr1[i] == arr2[j]) intersect.Add(arr1[i]);
while (j < jMax && arr2[j] == arr1[i]) j++; //prevent redundant entries
}
return intersect;
}
这还可以防止任何条目出现两次。 有 2 个大小为 1000 万的排序数组,它在大约一秒钟内完成。 如果使用数组,编译器应该删除数组边界检查。For 语句中的长度,我不知道这是否适用于一段时间语句。