Быстрое слияние отсортированных подмножеств чисел с плавающей запятой в 4K в L1/L2

Что такое быстрый способ слияния отсортированных подмножеств массива до 4096 32-разрядных чисел с плавающей запятой на современном процессоре (SSE2 +) x86?

Предположите следующее:

  • Размер всего набора не превышает 4096 элементов.
  • Размер подмножеств открыт для обсуждения, но предположим, что между 16-256 изначально
  • Все данные, используемые слиянием, должны предпочтительно вписываться в L1
  • Размер кэша данных L1 составляет 32K. 16K уже используется для самих данных, поэтому у вас есть 16K для воспроизведения с
  • Все данные уже находятся в L1 (с такой высокой степенью достоверности) - он просто управлялся сортировкой
  • Все данные выравниваются по 16 байт
  • Мы хотим попытаться минимизировать ветвление (по понятным причинам)

Основные критерии выполнимости: быстрее, чем сортировка radix в L1 LSD.

Мне было бы очень интересно узнать, знает ли кто-нибудь разумный способ сделать это, учитывая приведенные выше параметры!:)

Ответы

Ответ 1

Здесь очень наивный способ сделать это. (Пожалуйста, извините за ошибки, вызванные белыми 4-мя ошибками на основе бред;)

//4x sorted subsets
data[4][4] = {
  {3, 4, 5, INF},
  {2, 7, 8, INF},
  {1, 4, 4, INF},
  {5, 8, 9, INF}
}

data_offset[4] = {0, 0, 0, 0}

n = 4*3

for(i=0, i<n, i++):
  sub = 0
  sub = 1 * (data[sub][data_offset[sub]] > data[1][data_offset[1]])
  sub = 2 * (data[sub][data_offset[sub]] > data[2][data_offset[2]])
  sub = 3 * (data[sub][data_offset[sub]] > data[3][data_offset[3]])

  out[i] = data[sub][data_offset[sub]]
  data_offset[sub]++


Edit:
С поддержкой AVX2 и ее поддержкой мы могли одновременно сравнивать до 8 подмножеств.


Изменить 2:
В зависимости от типа литья можно было бы сбрасывать 3 дополнительных тактовых цикла на итерацию на Nehalem (mul: 5, shift + sub: 4)

//Assuming 'sub' is uint32_t
sub = ... << ((data[sub][data_offset[sub]] > data[...][data_offset[...]]) - 1)


Изменить 3:
Возможно, в какой-то степени можно использовать исполнение вне порядка, особенно когда K становится больше, используя два или более значения max:

max1 = 0
max2 = 1
max1 = 2 * (data[max1][data_offset[max1]] > data[2][data_offset[2]])
max2 = 3 * (data[max2][data_offset[max2]] > data[3][data_offset[3]])
...
max1 = 6 * (data[max1][data_offset[max1]] > data[6][data_offset[6]])
max2 = 7 * (data[max2][data_offset[max2]] > data[7][data_offset[7]])

q = data[max1][data_offset[max1]] < data[max2][data_offset[max2]]

sub = max1*q + ((~max2)&1)*q


Изменить 4:

В зависимости от интеллекта компилятора мы можем полностью удалить умножения с помощью тернарного оператора:

sub = (data[sub][data_offset[sub]] > data[x][data_offset[x]]) ? x : sub


Изменить 5:

Чтобы избежать дорогостоящих сравнений с плавающей запятой, мы могли бы просто reinterpret_cast<uint32_t*>() данных, так как это привело бы к целочисленному сравнению.

Другая возможность - использовать регистры SSE, поскольку они не печатаются и явно используют целые инструкции сравнения.

Это работает из-за операторов < > ==, дающих те же результаты при интерпретации float на двоичном уровне.


Изменить 6:

Если мы разворачиваем наш цикл достаточно, чтобы соответствовать количеству значений в количестве регистров SSE, мы могли бы выполнить сравнение данных.

В конце итерации мы переведем регистр, содержащий выбранное максимальное/минимальное значение, и сдвинем его.

Хотя для этого требуется немного переработать индексирование, оно может оказаться более эффективным, чем засоривание цикла с помощью LEA.

Ответ 2

Это больше тема исследования, но я нашел эту статью, в которой обсуждается минимизация неверных предсказаний отрасли с использованием сортировки с использованием d-way.

Ответ 3

Наиболее очевидным ответом, который приходит на ум, является стандартное слияние N-way с использованием кучи. Это будет O (N log k). Число подмножеств составляет от 16 до 256, поэтому наихудшее поведение (с 256 подмножествами по 16 элементов) будет 8N.

Поведение кэша должно быть... разумным, хотя и не идеальным. Куча, где большая часть действия, вероятно, останется в кеше повсюду. Часть записываемого массива будет также, скорее всего, находиться в кеше.

У вас есть 16K данных (массив с отсортированными подпоследовательностями), куча (1K, худший случай) и отсортированный выходной массив (16K снова), и вы хотите, чтобы он вписывался в кеш 32 КБ. Звучит как проблема, но, возможно, это не так. Данные, которые, скорее всего, будут заменены, - это фронт выходного массива после перемещения точки вставки. Предполагая, что отсортированные подпоследовательности распределены довольно равномерно, к ним следует обращаться достаточно часто, чтобы сохранить их в кеше.

Ответ 4

Вы можете слить встроенные массивы (дорогие).

typedef unsigned uint;
typedef uint* uint_ptr;

void merge(uint*in1_begin, uint*in1_end, uint*in2_begin, uint*in2_end, uint*out){

  int_ptr in [] = {in1_begin, in2_begin};
  int_ptr in_end [] = {in1_end, in2_end};

  // the loop branch is cheap because it is easy predictable
  while(in[0] != in_end[0] && in[1] != in_end[1]){
    int i = (*in[0] - *in[1]) >> 31;
    *out = *in[i];
    ++out;
    ++in[i];
  }

  // copy the remaining stuff ...
}

Заметим, что (* в [0] - * в [1]) → 31 эквивалентно * в [0] - * в [1] 0, что эквивалентно * в [0] *в 1]. Причина, по которой я записал ее, используя трюк с битрейтом вместо

int i = *in[0] < *in[1];

заключается в том, что не все компиляторы генерируют свободный код ветки для < версия.

К сожалению, вы используете float вместо ints, который сначала кажется showstopper, потому что я не вижу, как реально реализовать * в [0] < * в [1] филиал бесплатный. Однако на большинстве современных архитектур вы интерпретируете битпаттерны позитивных поплавков (которые также не являются NAN, INF или такими странными вещами) как ints и сравнивают их с использованием < и вы все равно получите правильный результат. Возможно, вы распространите это наблюдение на произвольные поплавки.

Ответ 5

Алгоритмы сортировки SIMD уже подробно изучены. В статье Эффективная реализация сортировки на многоядерной архитектуре SIMD-архитектуры описывает эффективный алгоритм для того, что вы описываете (и многое другое).

Основная идея заключается в том, что вы можете уменьшить объединение двух произвольно длинных списков в слияние блоков из k последовательных значений (где k может варьироваться от 4 до 16): первый блок z[0] = merge(x[0], y[0]).lo. Чтобы получить второй блок, мы знаем, что оставшееся merge(x[0], y[0]).hi содержит nx элементы из x и ny элементов из y, с nx+ny == k. Но z[1] не может содержать элементы как из x[1], так и y[1], потому что для этого требуется z[1] содержать больше, чем nx+ny элементов: так что нам просто нужно выяснить, какие из x[1] и y[1] потребностей быть добавленным. Первый с нижним первым элементом обязательно появится сначала в z, поэтому это просто делается путем сравнения их первого элемента. И мы просто повторяем это, пока не будет больше данных для слияния.

Псевдокод, предполагая, что массивы заканчиваются значением +inf:

a := *x++
b := *y++
while not finished:
    lo,hi := merge(a,b)
    *z++ := lo
    a := hi
    if *x[0] <= *y[0]:
        b := *x++
    else:
        b := *y++

(обратите внимание, как это похоже на обычную скалярную реализацию слияния)

Конечно, условный переход не требуется в реальной реализации: например, вы могли условно поменять x и y трюком xor, а затем безоговорочно читать *x++.

merge сам может быть реализован с помощью битной сортировки. Но если k низкий, будет много зависимостей между командами, что приведет к высокой задержке. В зависимости от количества массивов, которые вы должны объединить, вы можете выбрать k достаточно высоко, чтобы задержка в merge была замаскирована или если это возможно, чередуйте несколько двухсторонних слияний. Подробнее см. В документе.


Изменить. Ниже приведена диаграмма при k = 4. Все асимптотики предполагают, что k фиксировано.

  • Большая серая коробка объединяет два массива размера n = m * k (на картинке, m = 3).

    enter image description here

    • Мы работаем с блоками размера k.
    • Блок "целое блочное объединение" объединяет два массива поблочно, сравнивая их первые элементы. Это линейная операция времени, и она не потребляет память, потому что мы передаем данные в остальную часть блока. Производительность не имеет особого значения, поскольку латентность будет ограничена задержкой блоков "merge4" .
    • Каждый флажок "merge4" объединяет два блока, выдает нижние k-элементы и подает верхние k-элементы на следующий "merge4" . Каждый блок "merge4" выполняет ограниченное число операций, а число "merge4" является линейным по n.
    • Таким образом, временная стоимость слияния линейна по n. И поскольку "merge4" имеет более низкую задержку, чем выполнение 8 последовательных сравнений без SIMD, будет большой ускорение по сравнению с объединением без SIMD.
  • Наконец, чтобы расширить наш двухсторонний слияние для объединения многих массивов, мы упорядочиваем большие серые квадраты в классическом стиле "разделяй и властвуй". Каждый уровень имеет сложную линейность по количеству элементов, поэтому общая сложность - это O (n log (n/n0)), где n0 - начальный размер отсортированных массивов, а n - размер конечного массива.

    diagram

Ответ 6

Вы можете сделать простое ядро ​​слияния для объединения списков K:

float *input[K];
float *output;

while (true) {
  float min = *input[0];
  int min_idx = 0;
  for (int i = 1; i < K; i++) {
    float v = *input[i];
    if (v < min) {
      min = v;     // do with cmov
      min_idx = i; // do with cmov
    }
  }
  if (min == SENTINEL) break;
  *output++ = min;
  input[min_idx]++;
}

Там нет кучи, так что это довольно просто. Плохая часть состоит в том, что это O (NK), что может быть плохим, если K велико (в отличие от реализации кучи, которая является O (N log K)). Таким образом, вы просто выбираете максимум K (4 или 8 могут быть хорошими, тогда вы можете развернуть внутренний цикл) и делать большие K каскадным слиянием (дескриптор K = 64, делая 8-сторонние слияния групп списков, затем 8-позиционное слияние результатов).