Сравнение массивов (элемент за элементом)

Алгоритм, с которым я работаю, тратит огромную часть времени на сравнение одного массива с строкой матрицы. Если любой i-й элемент один и тот же, алгоритм вызывает процедуру A, если ни один элемент не равен, вместо этого вызывается процедура B. Например:

[1, 4, 10, 3, 5] и [5, 3, 0, 3, 0] вызывает A(), потому что для 4-й позиции значение равно 3 в обоих массивах.

[1, 4, 10, 3, 5] и [5, 3, 0, 1, 0] вызывает B(), потому что для той же позиции значения никогда не совпадают.

Заметим, что (1) массивы и строки матрицы всегда имеют одинаковый размер N, и (2) алгоритм вызывает A() , когда соответствует хотя бы одно значение.

Самый простой, но очень наивный способ сделать это в C:

for(int i=0; i<N; i++)
   if( A[i] == B[i] ){
      flag = 1;
      break;
   }

Это все еще очень неэффективно. В худшем случае у меня будет N сравнений. Реальная проблема здесь заключается в том, что алгоритм выполняет триллионы этих сравнений.

N (размер массива/строки в матрице) варьируется от 100 до 1000. Я бы хотел ускорить эту процедуру. Я посмотрел на векторию, и я обнаружил, что могу использовать cmpeq_pd. Однако векторизация будет по-прежнему ограничена, потому что все мои записи longs. Есть ли кто-нибудь с идеей? Могу ли я применять маски и т.д., Возможно?

Дополнительная информация/контекст:

  • Это итеративный алгоритм. На каждой итерации я увеличиваю матрицу в одной строке и проверю всю матрицу несколько раз. Я мог бы также обновить пару строк.
  • Вероятность совпадения не зависит от позиции.
  • Я готов иметь ложные срабатывания и негативы, чтобы значительно ускорить эту процедуру.
  • Если есть совпадение, позиция, в которой проверяется совпадение, имеет значение не (мне просто нужно знать, есть ли подходящая позиция).
  • Самое большое количество (около 70%) сравнений не приводит к совпадению.
  • Параллелизация выполняется на другом уровне, т.е. это ядро ​​нельзя распараллелить.

Ответы

Ответ 1

Я не знаю, применимо ли это для приложения, которое вы разрабатываете, но операции с огромными массивами обычно очень хорошо ускоряются на графическом процессоре. Вы можете ожидать увеличения пропускной способности 10-20x по сравнению с процессором. Если это возможно для вашего приложения запустить критическую часть на CUDA, это может иметь огромное значение.

Ответ 2

Несмотря на то, что ваш процессор Sandy Bridge имеет только AVX для 256-битного SIMD (а не AVX2) и поэтому не имеет поддержки для 4-разрядных операций с биполярным ЦИФОМ в 64-разрядном режиме, я думаю, что вы все равно можете достичь 4-стороннего SIMD с помощью инструкций с плавающей точкой AVX, поскольку следует: сравнить 2 x 256-битных вектора 64-битных целых значений, v1, v2:

__m256d vcmp = _mm256_xor_pd(v1, v2); // use XOR rather than compare, so we are not 
                                      // affected by values which map to NaNs
vcmp = _mm256_cmp_pd(vcmp, _mm256_setzero_pd(), _CMP_EQ_OQ);
                                      // now we can do a valid comparison as if the
                                      // data really is double precision float
int mask = _mm256_movemask_pd(vcmp);  // extract the sign bits
bool any_eq = (mask != 0);            // if any elements matched then mask
                                      // will be non-zero

Вот пример программы для тестирования и иллюстрации:

#include <stdio.h>
#include <stdint.h>
#include <immintrin.h>

int test(__m256d v1, __m256d v2)
{
    __m256d vcmp = _mm256_xor_pd(v1, v2);
    vcmp = _mm256_cmp_pd(vcmp, _mm256_setzero_pd(), _CMP_EQ_OQ);
    return _mm256_movemask_pd(vcmp);
}

int main()
{
    int64_t a1[4] = { 3098, 3860, 405, 3308 };
    int64_t a2[4] = { 1930, 1274, 2195, 2939 };
    int64_t a3[4] = { 1930, 1274, 405, 2939 };

    __m256i v1 = _mm256_loadu_pd((double *)a1);
    __m256i v2 = _mm256_loadu_pd((double *)a2);
    __m256i v3 = _mm256_loadu_pd((double *)a3);

    printf("mask = %d (should be == 0)\n", test(v1, v2));

    printf("mask = %d (should be != 0)\n", test(v1, v3));

    return 0;
}

Тест:

$ gcc -Wall -mavx a3mlord2.c && ./a.out 
mask = 0 (should be == 0)
mask = 4 (should be != 0)

Ответ 3

Всякий раз, когда вы ищете оптимизацию, перед вами идут разные пути:

  • алгоритмическая оптимизация: как правило, алгоритм сортировки для использования в вашем случае с использованием некоторых зависимостей внутри или между строками для проверки только некоторых случаев, а не значений N. Вы не сказали ничего, что мы можем использовать для этого, но, возможно, вы знаете такие правила - такая оптимизация может иметь прирост на порядок
  • Оптимизация среднего уровня: как только вы выбрали свой алгоритм, проверьте, как вы организуете свои циклы и тесты, - опять же, я не знаю, что можно сделать - обычно получайте около 10%, за исключением ужасных реализаций
  • Оптимизация на низком уровне: попытка быть умнее оптимизирующего компилятора, как правило, делает вас свободными, но в некоторых случаях бенчмаркинг различной реализации может дать выигрыш в несколько процентов.
  • Распараллеливание: если алгоритм поддерживает его, вы делите общую обработку на число или ядра или процессоры. Ожидаемый выигрыш обычно немного ниже количества одновременных потоков.

С учетом того, что вы сказали, единственной возможной оптимизацией будет распараллеливание обработки на n ядрах, каждое ядро ​​(минус одно), выполняющее часть строки, и другое, обрабатывающее результат этих первых сравнений. Но, как говорилось ранее, если есть правила в данных, коэффициент усиления может быть намного выше.

Ответ 4

Самый простой, но очень наивный способ сделать это в C - с помощью Как вы подразумеваете по вашим вопросам, пример кода, который вы предоставляете в рамках этого утверждения, может быть простым с точки зрения читаемости, но переводит ли он на самый простой наиболее эффективный метод сравнения данных после компиляции?

Предложите попробовать сравнить блок:
Способ представления данных для сравнения может способствовать скорости и эффективности сравнения. Загрузите значения в отдельные переменные (назначенные для использования отдельных регистров), затем сравните регистры.

long a1 = A[0];
long a2 = A[1];
long a3 = A[2];
long a4 = A[3];
...
long an = A[n];

long b1 = B[0];
long b2 = B[1];
long b3 = B[2];
long b4 = B[3];
...
long bn = B[n];

if ((a1 == b1) || (a2 == b2) || (a3 == b3) || (a4 == b4) ... || (an == bn))
{
   //do something
}
else
{
   //do something else
}

Чтобы действительно знать, является ли метод самым быстрым, его код, посмотрите на сборку, которую он создает, или проверите тест. Как вы предложили в своем посте, цикл элементов массива, вероятно, не самый эффективный способ.

EDIT: наклонная идея: Matlab известен тем, что включает в себя некоторые из самых быстрых алгоритмов сравнения массивов, а также имеет возможность преобразования Matlab в C. Если у вас или у коллеги есть копия Matlab, вы можете попробовать некоторое тестирование скорости для алгоритмов, созданных с использованием Matlab, а затем конвертировать в C, чтобы наблюдать за тем, что он создает. Я использовал эту функцию раньше, создаваемые ею C-конструкции не очень красивы, но обычно очень эффективны (с точки зрения скорости).

Ответ 5

Обработка SIMD вряд ли поможет вообще: у вас довольно небольшой цикл, который затрагивает множество данных (16 байтов на итерацию). Это, скорее всего, насытит шину памяти даже при работе без SIMD.

Как я вижу, у вас есть два основных варианта:

  • Вы используете шины более/более шире.
    Это может быть достигнуто за счет использования нескольких ядер или графических процессоров.

  • Вы пытаетесь уменьшить количество сравнений.
    Возможно ли это из вашего вопроса непонятно, но если ваш алгоритм делает одно и то же сравнение несколько раз, вы можете перестроить свой алгоритм, кэшируя результаты сравнения. В зависимости от алгоритма это может привести к значительному ускорению.

Ответ 6

Если вы используете gcc, и если вы находитесь на платформе x86, ваш код, скорее всего, выиграет от использования memcmp() вместо цикла "homegrown" for. memcmp() (соответственно встроенный аналог) делает некоторые довольно умные оптимизации.