Быстрое пересечение двух отсортированных целых массивов
Мне нужно найти пересечение двух отсортированных целых массивов и сделать это очень быстро.
Сейчас я использую следующий код:
int i = 0, j = 0;
while (i < arr1.Count && j < arr2.Count)
{
if (arr1[i] < arr2[j])
{
i++;
}
else
{
if (arr2[j] < arr1[i])
{
j++;
}
else
{
intersect.Add(arr2[j]);
j++;
i++;
}
}
}
К сожалению, для выполнения всей работы может потребоваться несколько часов.
Как это сделать быстрее? Я нашел эту статью, где используются инструкции SIMD. Можно ли использовать SIMD в .NET?
О чем вы думаете:
http://docs.go-mono.com/index.aspx?link=N:Mono.Simd Mono.SIMD
http://netasm.codeplex.com/ NetASM (ввести код asm в управляемый)
и что-то вроде http://www.atrevido.net/blog/PermaLink.aspx?guid=ac03f447-d487-45a6-8119-dc4fa1e932e1
EDIT:
Когда я говорю, что тысячи я имею в виду следующее (в коде)
for(var i=0;i<arrCollection1.Count-1;i++)
{
for(var j=i+1;j<arrCollection2.Count;j++)
{
Intersect(arrCollection1[i],arrCollection2[j])
}
}
Ответы
Ответ 1
UPDATE
Самый быстрый, который я получил, - 200 мс с размером массива 10 мил, с небезопасной версией (последний фрагмент кода).
Тест, который я сделал:
var arr1 = new int[10000000];
var arr2 = new int[10000000];
for (var i = 0; i < 10000000; i++)
{
arr1[i] = i;
arr2[i] = i * 2;
}
var sw = Stopwatch.StartNew();
var result = arr1.IntersectSorted(arr2);
sw.Stop();
Console.WriteLine(sw.Elapsed); // 00:00:00.1926156
Полная почта:
Я тестировал различные способы сделать это и нашел, что это очень хорошо:
public static List<int> IntersectSorted(this int[] source, int[] target)
{
// Set initial capacity to a "full-intersection" size
// This prevents multiple re-allocations
var ints = new List<int>(Math.Min(source.Length, target.Length));
var i = 0;
var j = 0;
while (i < source.Length && j < target.Length)
{
// Compare only once and let compiler optimize the switch-case
switch (source[i].CompareTo(target[j]))
{
case -1:
i++;
// Saves us a JMP instruction
continue;
case 1:
j++;
// Saves us a JMP instruction
continue;
default:
ints.Add(source[i++]);
j++;
// Saves us a JMP instruction
continue;
}
}
// Free unused memory (sets capacity to actual count)
ints.TrimExcess();
return ints;
}
Для дальнейшего улучшения вы можете удалить ints.TrimExcess();
, что также будет иметь большое значение, но вы должны подумать, если вам понадобится эта память.
Кроме того, если вы знаете, что вы можете разбить циклы, которые используют пересечения, и вам не нужно иметь результаты в виде массива/списка, вы должны изменить реализацию на итератор:
public static IEnumerable<int> IntersectSorted(this int[] source, int[] target)
{
var i = 0;
var j = 0;
while (i < source.Length && j < target.Length)
{
// Compare only once and let compiler optimize the switch-case
switch (source[i].CompareTo(target[j]))
{
case -1:
i++;
// Saves us a JMP instruction
continue;
case 1:
j++;
// Saves us a JMP instruction
continue;
default:
yield return source[i++];
j++;
// Saves us a JMP instruction
continue;
}
}
}
Еще одно улучшение заключается в использовании небезопасного кода:
public static unsafe List<int> IntersectSorted(this int[] source, int[] target)
{
var ints = new List<int>(Math.Min(source.Length, target.Length));
fixed (int* ptSrc = source)
{
var maxSrcAdr = ptSrc + source.Length;
fixed (int* ptTar = target)
{
var maxTarAdr = ptTar + target.Length;
var currSrc = ptSrc;
var currTar = ptTar;
while (currSrc < maxSrcAdr && currTar < maxTarAdr)
{
switch ((*currSrc).CompareTo(*currTar))
{
case -1:
currSrc++;
continue;
case 1:
currTar++;
continue;
default:
ints.Add(*currSrc);
currSrc++;
currTar++;
continue;
}
}
}
}
ints.TrimExcess();
return ints;
}
В целом наибольшее влияние на производительность было в if-else.
Включение его в каркас делало огромную разницу (примерно в 2 раза быстрее).
Ответ 2
Вы пробовали что-то простое:
var a = Enumerable.Range(1, int.MaxValue/100).ToList();
var b = Enumerable.Range(50, int.MaxValue/100 - 50).ToList();
//var c = a.Intersect(b).ToList();
List<int> c = new List<int>();
var t1 = DateTime.Now;
foreach (var item in a)
{
if (b.BinarySearch(item) >= 0)
c.Add(item);
}
var t2 = DateTime.Now;
var tres = t2 - t1;
Этот фрагмент кода содержит 1 массив элементов 21,474,836, а другой - 21 474 786
Если я использую var c = a.Intersect(b).ToList();
, я получаю OutOfMemoryException
Результат продукта будет 461,167,507,485,096 итераций с использованием вложенных foreach
Но с помощью этого простого кода пересечение произошло в TotalSeconds = 7.3960529 (с использованием одного ядра)
Теперь я все еще не счастлив, поэтому я стараюсь увеличить производительность, нарушая это параллельно, как только я закончу, я отправлю его
Ответ 3
Yorye Nathan дал мне самое быстрое пересечение двух массивов с последним методом "небезопасного кода". К сожалению, для меня все еще было слишком медленно, мне нужно было сделать комбинации пересечений массивов, которые достигают комбинаций 2 ^ 32, почти нет? Я сделал следующие изменения и настройки, и время сократилось до 2,6 раза быстрее. Перед тем, как сделать предварительную оптимизацию, вы наверняка сможете сделать это так или иначе. Я использую только индексы вместо фактических объектов или идентификаторов или другого абстрактного сравнения. Итак, на примере, если вам нужно пересечь большое число, подобное этому
Arr1: 103344, 234566, 789900, 1947890,
Arr2: 150034, 234566, 845465, 23849854
поместить все в массив и массив
Arr1: 103344, 234566, 789900, 1947890, 150034, 845465,23849854
и использовать для пересечения упорядоченные индексы массива результатов
Arr1Index: 0, 1, 2, 3
Arr2Index: 1, 4, 5, 6
Теперь у нас есть меньшие числа, с которыми мы можем построить другие приятные массивы. Что я сделал после принятия метода из Yorye, я взял Arr2Index и расширил его, теоретически логический массив, практически в байтовые массивы из-за импликации размера памяти, чтобы:
Arr2IndexCheck: 0, 1, 0, 0, 1, 1, 1
который является более или менее словарем, который говорит мне для любого индекса, если второй массив содержит его.
Следующий шаг я не использовал распределение памяти, которое также занимало время, вместо этого я предварительно создал массив результатов перед вызовом метода, поэтому в процессе поиска моих комбинаций я никогда не создавал никаких экземпляров. Конечно, вам приходится иметь дело с длиной этого массива отдельно, поэтому, возможно, вам нужно его где-то сохранить.
Наконец, код выглядит следующим образом:
public static unsafe int IntersectSorted2(int[] arr1, byte[] arr2Check, int[] result)
{
int length;
fixed (int* pArr1 = arr1, pResult = result)
fixed (byte* pArr2Check = arr2Check)
{
int* maxArr1Adr = pArr1 + arr1.Length;
int* arr1Value = pArr1;
int* resultValue = pResult;
while (arr1Value < maxArr1Adr)
{
if (*(pArr2Check + *arr1Value) == 1)
{
*resultValue = *arr1Value;
resultValue++;
}
arr1Value++;
}
length = (int)(resultValue - pResult);
}
return length;
}
Вы можете увидеть, что размер массива результата возвращается функцией, затем вы делаете то, что хотите (измените размер, сохраните его). Очевидно, массив результатов должен иметь как минимум минимальный размер arr1 и arr2.
Большим улучшением является то, что я только перебираю первый массив, который лучше всего иметь меньший размер, чем второй, поэтому у вас меньше итераций. Чем меньше итераций меньше, тем меньше циклов процессора?
Итак, вот действительно быстрое пересечение двух упорядоченных массивов, что, если вам нужна высокая производительность, а также reaaaallally;).
Ответ 4
Являются ли arrCollection1
и arrCollection2
коллекциями массивов целых чисел? В этом случае вы должны получить некоторое заметное улучшение, запустив второй цикл из i+1
в отличие от 0
Ответ 5
С# не поддерживает SIMD. Кроме того, и я еще не понял, почему DLL, использующая SSE, не быстрее, когда вызывается из С#, чем эквивалентные функции, не относящиеся к SSE. Кроме того, все расширения SIMD, о которых я знаю, в любом случае не работают с ветвлением, т.е. Ваши инструкции "if".
Если вы используете .net 4.0, вы можете использовать Parallel For для получения скорости, если у вас несколько ядер. В противном случае вы можете написать многопоточную версию, если у вас есть .net 3.5 или меньше.
Вот метод, похожий на ваш:
IList<int> intersect(int[] arr1, int[] arr2)
{
IList<int> intersect = new List<int>();
int i = 0, j = 0;
int iMax = arr1.Length - 1, jMax = arr2.Length - 1;
while (i < iMax && j < jMax)
{
while (i < iMax && arr1[i] < arr2[j]) i++;
if (arr1[i] == arr2[j]) intersect.Add(arr1[i]);
while (i < iMax && arr1[i] == arr2[j]) i++; //prevent reduntant entries
while (j < jMax && arr2[j] < arr1[i]) j++;
if (arr1[i] == arr2[j]) intersect.Add(arr1[i]);
while (j < jMax && arr2[j] == arr1[i]) j++; //prevent redundant entries
}
return intersect;
}
Это также предотвращает появление какой-либо записи дважды. С 2 отсортированными массивами размером 10 миллионов, он завершился примерно через секунду. Компилятор должен удалить проверки границ массива, если вы используете array.Length в операторе For, я не знаю, работает ли это в while while.