Безветровое K-средство (или другие оптимизации)
Примечание. Я хотел бы больше узнать о том, как подойти и придумать такие решения, а не само решение.
У меня есть очень критически важная функция в моей системе, которая появляется в качестве профилирующей точки доступа номер один в определенных контекстах. Он находится в середине итерации k-означает (уже многопоточность с использованием параллели для обработки поддиапазонов точек в каждом рабочем потоке).
ClusterPoint& pt = points[j];
pt.min_index = -1;
pt.min_dist = numeric_limits<float>::max();
for (int i=0; i < num_centroids; ++i)
{
const ClusterCentroid& cent = centroids[i];
const float dist = ...;
if (dist < pt.min_dist) // <-- #1 hotspot
{
pt.min_dist = dist;
pt.min_index = i;
}
}
Любая экономия времени, затрачиваемого на обработку этого раздела кода, существенно возрастает, поэтому я часто занимаюсь этим много. Возможно, стоит поставить петлю центроида снаружи, например, и повторить параллельные точки для заданного центра тяжести. Количество кластерных точек здесь составляет миллионы, а число центроидов - в тысячах. Алгоритм применяется для нескольких итераций (часто до 10). Он не стремится к идеальной конвергенции/стабильности, а лишь к некоторому "разумному" приближению.
Любые идеи приветствуются, но то, что я действительно хочу обнаружить, - это сделать этот код безветристым, поскольку это позволит использовать SIMD-версию. Я не разработал такого рода умственные способности, чтобы легко понять, как придумать веткистые решения: мой мозг терпит неудачу там, как это было, когда я впервые был подвергнут рекурсии в первые дни, поэтому руководство по написанию нераспространения код и способы разработки соответствующего мышления для него также будут полезны.
Короче говоря, я ищу любые руководства и подсказки и предложения (не обязательно решения) о том, как оптимизировать этот код для микро-оптимизации. У него, скорее всего, есть место для алгоритмических улучшений, но мой блайнд всегда был в решениях по микрооптимизации (и мне любопытно узнать, как применять их более эффективно, не выходя за борт с ним). Он уже плотно многопоточен с короткой параллелью для логики, поэтому я довольно сильно подтолкнул в угол микро-оптимизации как одну из более быстрых задач, чтобы попробовать без более разумного алгоритма. Мы полностью можем изменить расположение памяти.
В ответ на алгоритмические предложения
О том, как смотреть на все это не так, пытаясь оптимизировать алгоритм O (knm), который можно явно улучшить на уровне алгоритма, я полностью согласен. Это подталкивает этот конкретный вопрос в несколько академическую и непрактичную сферу. Однако, если мне удастся дать анекдот, я исхожу из первоначального фона программирования высокого уровня - большой акцент на широкой, крупномасштабной точке зрения, безопасности и очень мало на деталях реализации на низком уровне. Недавно я переключил проекты на совершенно другой вид современного вкуса, и я изучаю всевозможные новые трюки от своих сверстников кэш-памяти, GPGPU, нераспространенных технологий, SIMD, специализированных распределителей памяти, которые фактически превосходят malloc ( но для конкретных сценариев) и т.д.
Здесь я пытаюсь догнать последние тенденции в производительности, и я неожиданно обнаружил, что те старые структуры данных, которые я часто предпочитал в 90-е годы, которые часто были связаны/структуры дерева, на самом деле значительно превосходят гораздо более наивный, грубый, микро-оптимизированный, параллельный код, применяющий настроенные инструкции по смежным блокам памяти. Это несколько разочаровывает в то же время, поскольку я чувствую, что теперь мы подгоняем алгоритмы больше для машины и сужаем возможности таким образом (особенно с GPGPU).
Самое забавное, что я считаю, что этот тип микро-оптимизированного, быстрого кода обработки массива намного проще в обслуживании, чем сложные алгоритмы и структуры данных, которые я использовал раньше. Для начала их проще обобщить. Кроме того, мои сверстники часто могут жаловаться на жалобы относительно определенного замедления в области, просто пощекотать параллель для и, возможно, некоторых SIMD и назвать это с приличной скоростью. Алгоритмические улучшения часто могут предлагать значительно больше, но скорость и неинтрузивность, при которых могут применяться эти микрооптимизации, мне хотелось бы узнать больше в этой области, поскольку чтение статей по лучшим алгоритмам может занять некоторое время (а также потребовать больше обширные изменения). Таким образом, я в последнее время прыгал на эту группу микро-оптимизации немного позже и, возможно, слишком много в этом конкретном случае, но мое любопытство больше связано с расширением диапазона возможных решений для любого сценария.
Демонтажные
Примечание. На самом деле, я очень плохо разбираюсь в сборке, поэтому я часто настраивал вещи больше в пробной и ошибочной форме, придумывая несколько образованных догадок о том, почему горячая точка, показанная в vtune, может быть узким местом, а затем попробовать вещи чтобы увидеть, улучшатся ли времена, предполагая, что у догадок есть какой-то намек на правду, если время действительно улучшилось или полностью пропустило отметку, если они этого не делают.
000007FEEE3FB8A1 jl thread_partition+70h (7FEEE3FB780h)
{
ClusterPoint& pt = points[j];
pt.min_index = -1;
pt.min_dist = numeric_limits<float>::max();
for (int i = 0; i < num_centroids; ++i)
000007FEEE3FB8A7 cmp ecx,r10d
000007FEEE3FB8AA jge thread_partition+1F4h (7FEEE3FB904h)
000007FEEE3FB8AC lea rax,[rbx+rbx*2]
000007FEEE3FB8B0 add rax,rax
000007FEEE3FB8B3 lea r8,[rbp+rax*8+8]
{
const ClusterCentroid& cent = centroids[i];
const float x = pt.pos[0] - cent.pos[0];
const float y = pt.pos[1] - cent.pos[1];
000007FEEE3FB8B8 movss xmm0,dword ptr [rdx]
const float z = pt.pos[2] - cent.pos[2];
000007FEEE3FB8BC movss xmm2,dword ptr [rdx+4]
000007FEEE3FB8C1 movss xmm1,dword ptr [rdx-4]
000007FEEE3FB8C6 subss xmm2,dword ptr [r8]
000007FEEE3FB8CB subss xmm0,dword ptr [r8-4]
000007FEEE3FB8D1 subss xmm1,dword ptr [r8-8]
const float dist = x*x + y*y + z*z;
000007FEEE3FB8D7 mulss xmm2,xmm2
000007FEEE3FB8DB mulss xmm0,xmm0
000007FEEE3FB8DF mulss xmm1,xmm1
000007FEEE3FB8E3 addss xmm2,xmm0
000007FEEE3FB8E7 addss xmm2,xmm1
if (dist < pt.min_dist)
// VTUNE HOTSPOT
000007FEEE3FB8EB comiss xmm2,dword ptr [rdx-8]
000007FEEE3FB8EF jae thread_partition+1E9h (7FEEE3FB8F9h)
{
pt.min_dist = dist;
000007FEEE3FB8F1 movss dword ptr [rdx-8],xmm2
pt.min_index = i;
000007FEEE3FB8F6 mov dword ptr [rdx-10h],ecx
000007FEEE3FB8F9 inc ecx
000007FEEE3FB8FB add r8,30h
000007FEEE3FB8FF cmp ecx,r10d
000007FEEE3FB902 jl thread_partition+1A8h (7FEEE3FB8B8h)
for (int j = *irange.first; j < *irange.last; ++j)
000007FEEE3FB904 inc edi
000007FEEE3FB906 add rdx,20h
000007FEEE3FB90A cmp edi,dword ptr [rsi+4]
000007FEEE3FB90D jl thread_partition+31h (7FEEE3FB741h)
000007FEEE3FB913 mov rbx,qword ptr [irange]
}
}
}
}
Мы вынуждены ориентироваться на SSE 2 - немного позади в наше время, но пользовательская база фактически сработала один раз, когда мы предположили, что даже SSE 4 был в порядке как минимальное требование (у пользователя была прототипная машина Intel).
Обновление с автономным тестом: ~ 5.6 с
Я очень благодарен за любую помощь! Поскольку кодовая база довольно обширна, и условия для запуска этого кода сложны (системные события запускаются по нескольким потокам), это немного громоздко, чтобы делать экспериментальные изменения и анализировать их каждый раз. Поэтому я установил поверхностный тест на стороне как отдельное приложение, которое другие могут запускать и опробовать, чтобы я мог экспериментировать со всеми этими любезно предложенными решениями.
#define _SECURE_SCL 0
#include <iostream>
#include <fstream>
#include <vector>
#include <limits>
#include <ctime>
#if defined(_MSC_VER)
#define ALIGN16 __declspec(align(16))
#else
#include <malloc.h>
#define ALIGN16 __attribute__((aligned(16)))
#endif
using namespace std;
// Aligned memory allocation (for SIMD).
static void* malloc16(size_t amount)
{
#ifdef _MSC_VER
return _aligned_malloc(amount, 16);
#else
void* mem = 0;
posix_memalign(&mem, 16, amount);
return mem;
#endif
}
template <class T>
static T* malloc16_t(size_t num_elements)
{
return static_cast<T*>(malloc16(num_elements * sizeof(T)));
}
// Aligned free.
static void free16(void* mem)
{
#ifdef _MSC_VER
return _aligned_free(mem);
#else
free(mem);
#endif
}
// Test parameters.
enum {num_centroids = 512};
enum {num_points = num_centroids * 2000};
enum {num_iterations = 5};
static const float range = 10.0f;
class Points
{
public:
Points(): data(malloc16_t<Point>(num_points))
{
for (int p=0; p < num_points; ++p)
{
const float xyz[3] =
{
range * static_cast<float>(rand()) / RAND_MAX,
range * static_cast<float>(rand()) / RAND_MAX,
range * static_cast<float>(rand()) / RAND_MAX
};
init(p, xyz);
}
}
~Points()
{
free16(data);
}
void init(int n, const float* xyz)
{
data[n].centroid = -1;
data[n].xyz[0] = xyz[0];
data[n].xyz[1] = xyz[1];
data[n].xyz[2] = xyz[2];
}
void associate(int n, int new_centroid)
{
data[n].centroid = new_centroid;
}
int centroid(int n) const
{
return data[n].centroid;
}
float* operator[](int n)
{
return data[n].xyz;
}
private:
Points(const Points&);
Points& operator=(const Points&);
struct Point
{
int centroid;
float xyz[3];
};
Point* data;
};
class Centroids
{
public:
Centroids(Points& points): data(malloc16_t<Centroid>(num_centroids))
{
// Naive initial selection algorithm, but outside the
// current area of interest.
for (int c=0; c < num_centroids; ++c)
init(c, points[c]);
}
~Centroids()
{
free16(data);
}
void init(int n, const float* xyz)
{
data[n].count = 0;
data[n].xyz[0] = xyz[0];
data[n].xyz[1] = xyz[1];
data[n].xyz[2] = xyz[2];
}
void reset(int n)
{
data[n].count = 0;
data[n].xyz[0] = 0.0f;
data[n].xyz[1] = 0.0f;
data[n].xyz[2] = 0.0f;
}
void sum(int n, const float* pt_xyz)
{
data[n].xyz[0] += pt_xyz[0];
data[n].xyz[1] += pt_xyz[1];
data[n].xyz[2] += pt_xyz[2];
++data[n].count;
}
void average(int n)
{
if (data[n].count > 0)
{
const float inv_count = 1.0f / data[n].count;
data[n].xyz[0] *= inv_count;
data[n].xyz[1] *= inv_count;
data[n].xyz[2] *= inv_count;
}
}
float* operator[](int n)
{
return data[n].xyz;
}
int find_nearest(const float* pt_xyz) const
{
float min_dist_squared = numeric_limits<float>::max();
int min_centroid = -1;
for (int c=0; c < num_centroids; ++c)
{
const float* cen_xyz = data[c].xyz;
const float x = pt_xyz[0] - cen_xyz[0];
const float y = pt_xyz[1] - cen_xyz[1];
const float z = pt_xyz[2] - cen_xyz[2];
const float dist_squared = x*x + y*y * z*z;
if (min_dist_squared > dist_squared)
{
min_dist_squared = dist_squared;
min_centroid = c;
}
}
return min_centroid;
}
private:
Centroids(const Centroids&);
Centroids& operator=(const Centroids&);
struct Centroid
{
int count;
float xyz[3];
};
Centroid* data;
};
// A high-precision real timer would be nice, but we lack C++11 and
// the coarseness of the testing here should allow this to suffice.
static double sys_time()
{
return static_cast<double>(clock()) / CLOCKS_PER_SEC;
}
static void k_means(Points& points, Centroids& centroids)
{
// Find the closest centroid for each point.
for (int p=0; p < num_points; ++p)
{
const float* pt_xyz = points[p];
points.associate(p, centroids.find_nearest(pt_xyz));
}
// Reset the data of each centroid.
for (int c=0; c < num_centroids; ++c)
centroids.reset(c);
// Compute new position sum of each centroid.
for (int p=0; p < num_points; ++p)
centroids.sum(points.centroid(p), points[p]);
// Compute average position of each centroid.
for (int c=0; c < num_centroids; ++c)
centroids.average(c);
}
int main()
{
Points points;
Centroids centroids(points);
cout << "Starting simulation..." << endl;
double start_time = sys_time();
for (int i=0; i < num_iterations; ++i)
k_means(points, centroids);
cout << "Time passed: " << (sys_time() - start_time) << " secs" << endl;
cout << "# Points: " << num_points << endl;
cout << "# Centroids: " << num_centroids << endl;
// Write the centroids to a file to give us some crude verification
// of consistency as we make changes.
ofstream out("centroids.txt");
for (int c=0; c < num_centroids; ++c)
out << "Centroid " << c << ": " << centroids[c][0] << "," << centroids[c][1] << "," << centroids[c][2] << endl;
}
Я знаю об опасности поверхностного тестирования, но, поскольку он уже считается горячей точкой на предыдущих сеансах реального мира, я надеюсь, что это простительно. Меня также интересуют общие методы, связанные с микро-оптимизацией такого кода.
Я получил несколько разные результаты в профилировании этого. Время здесь немного более равномерно распределено внутри цикла, и я не уверен, почему. Возможно, это потому, что данные меньше (я пропустил элементы и вытащил член min_dist
и сделал его локальной переменной). Точное соотношение между центроидами и точками также немного отличается, но, надеюсь, достаточно близко, чтобы перевести улучшения в исходный код. Он также однопоточный в этом поверхностном тесте, и разборка выглядит совсем по-другому, поэтому я могу рискнуть оптимизировать этот поверхностный тест без оригинала (риск, который я готов принять сейчас, так как я больше заинтересован в расширении моих знаний методов, которые могли бы оптимизировать эти случаи, а не решение для этого точного случая).
![VTune]()
Обновление с предложением Yochai Timmer - ~ 12,5 секунд
О, я сталкиваюсь с проблемами микро-оптимизации без понимания сборки очень хорошо. Я заменил это:
-if (min_dist_squared > dist_squared)
-{
- min_dist_squared = dist_squared;
- pt.centroid = c;
-}
При этом:
+const bool found_closer = min_dist_squared > dist_squared;
+pt.centroid = bitselect(found_closer, c, pt.centroid);
+min_dist_squared = bitselect(found_closer, dist_squared, min_dist_squared);
.. только для того, чтобы найти время, увеличенное с ~ 5.6 сек до ~ 12,5 сек. Тем не менее, это не его вина и не отнимает от ценности его решения - это мое за то, что он не понимает, что происходит на уровне машины и берет удары в темноте. Это, по-видимому, пропустило, и, по-видимому, я не был жертвой неправильного предсказания ветки, как я изначально думал. Тем не менее, его предлагаемое решение - замечательная и обобщенная функция, чтобы попробовать в таких случаях, и я благодарен за добавление его в свой набор подсказок и трюков. Теперь для раунда 2.
Решение Harold SIMD - 2.496 секунд (см. оговорку)
Это решение может быть потрясающим. После преобразования репрезентации кластера в SoA, я получаю время ~ 2,5 секунды с этим! К сожалению, похоже, какой-то сбой. Я получаю очень разные результаты для конечного результата, который предлагает более чем небольшие различия в точности, в том числе некоторые центроиды к концу со значениями 0 (подразумевая, что они не были найдены в поиске). Я пытаюсь пройти через SIMD-логику с помощью отладчика, чтобы узнать, что может быть - это может быть просто ошибкой транскрипции с моей стороны, но здесь код, если кто-то может обнаружить ошибку.
Если ошибка может быть исправлена без замедления результатов, это улучшение скорости больше, чем я когда-либо себе представлял из чистой микро-оптимизации!
// New version of Centroids::find_nearest (from harold solution):
int find_nearest(const float* pt_xyz) const
{
__m128i min_index = _mm_set_epi32(3, 2, 1, 0);
__m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x));
__m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y));
__m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z));
__m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i index = min_index;
for (int i=4; i < num_centroids; i += 4)
{
xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x + i));
ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y + i));
zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z + i));
__m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
min_dist = _mm_min_ps(min_dist, dist);
min_index = _mm_or_si128(_mm_and_si128(index, mask),
_mm_andnot_si128(mask, min_index));
index = _mm_add_epi32(index, _mm_set1_epi32(4));
}
ALIGN16 float mdist[4];
ALIGN16 uint32_t mindex[4];
_mm_store_ps(mdist, min_dist);
_mm_store_si128((__m128i*)mindex, min_index);
float closest = mdist[0];
int closest_i = mindex[0];
for (int i=1; i < 4; i++)
{
if (mdist[i] < closest)
{
closest = mdist[i];
closest_i = mindex[i];
}
}
return closest_i;
}
Решение Гарольда SIMD (исправлено) - ~ 2,5 с
После применения исправлений и тестирования их результаты не повреждены и будут корректно работать с аналогичными улучшениями в исходной кодовой базе!
Так как это касается святого грааля знаний, который я пытался понять лучше (бездисковый SIMD), я собираюсь наградить решение дополнительными реквизитами для более чем удвоения скорости операции. У меня есть домашнее задание, пытаясь понять это, поскольку моя цель состояла не только в том, чтобы смягчить эту точку доступа, но и в расширении моего личного понимания возможных решений для их решения.
Тем не менее, я благодарен за все вклады от алгоритмических предложений к действительно классному трюку bitselect
! Хотел бы я принять все ответы. В какой-то момент я могу попробовать все из них, но теперь у меня есть домашняя работа, вырезанная в понимании некоторых из этих неарифметических операций SIMD.
int find_nearest_simd(const float* pt_xyz) const
{
__m128i min_index = _mm_set_epi32(3, 2, 1, 0);
__m128 pt_xxxx = _mm_set1_ps(pt_xyz[0]);
__m128 pt_yyyy = _mm_set1_ps(pt_xyz[1]);
__m128 pt_zzzz = _mm_set1_ps(pt_xyz[2]);
__m128 xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x));
__m128 ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y));
__m128 zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z));
__m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i index = min_index;
for (int i=4; i < num_centroids; i += 4)
{
xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x + i));
ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y + i));
zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z + i));
__m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
index = _mm_add_epi32(index, _mm_set1_epi32(4));
__m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
min_dist = _mm_min_ps(min_dist, dist);
min_index = _mm_or_si128(_mm_and_si128(index, mask),
_mm_andnot_si128(mask, min_index));
}
ALIGN16 float mdist[4];
ALIGN16 uint32_t mindex[4];
_mm_store_ps(mdist, min_dist);
_mm_store_si128((__m128i*)mindex, min_index);
float closest = mdist[0];
int closest_i = mindex[0];
for (int i=1; i < 4; i++)
{
if (mdist[i] < closest)
{
closest = mdist[i];
closest_i = mindex[i];
}
}
return closest_i;
}
Ответы
Ответ 1
Слишком плохо, мы не можем использовать SSE4.1, но очень хорошо, SSE2. Я не тестировал это, просто скомпилировал его, чтобы увидеть, есть ли синтаксические ошибки и посмотреть, имеет ли смысл сборка (в основном это хорошо, хотя GCC разливает min_index
даже с некоторыми регистрами xmm
, которые не используются, не уверен, почему бывает)
int find_closest(float *x, float *y, float *z,
float pt_x, float pt_y, float pt_z, int n) {
__m128i min_index = _mm_set_epi32(3, 2, 1, 0);
__m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x));
__m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y));
__m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z));
__m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i index = min_index;
for (int i = 4; i < n; i += 4) {
xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x + i));
ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y + i));
zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z + i));
__m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif),
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
index = _mm_add_epi32(index, _mm_set1_epi32(4));
__m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
min_dist = _mm_min_ps(min_dist, dist);
min_index = _mm_or_si128(_mm_and_si128(index, mask),
_mm_andnot_si128(mask, min_index));
}
float mdist[4];
_mm_store_ps(mdist, min_dist);
uint32_t mindex[4];
_mm_store_si128((__m128i*)mindex, min_index);
float closest = mdist[0];
int closest_i = mindex[0];
for (int i = 1; i < 4; i++) {
if (mdist[i] < closest) {
closest = mdist[i];
closest_i = mindex[i];
}
}
return closest_i;
}
Как обычно, он ожидает, что указатели будут выровнены по 16. Кроме того, отступы должны быть с бесконечными точками (поэтому они никогда не приближаются к цели).
SSE 4.1 позволит вам заменить этот
min_index = _mm_or_si128(_mm_and_si128(index, mask),
_mm_andnot_si128(mask, min_index));
Под этим
min_index = _mm_blendv_epi8(min_index, index, mask);
Здесь версия asm, сделанная для vsyasm, немного протестировала (кажется, работает)
bits 64
section .data
align 16
centroid_four:
dd 4, 4, 4, 4
centroid_index:
dd 0, 1, 2, 3
section .text
global find_closest
proc_frame find_closest
;
; arguments:
; ecx: number of points (multiple of 4 and at least 4)
; rdx -> array of 3 pointers to floats (x, y, z) (the points)
; r8 -> array of 3 floats (the reference point)
;
alloc_stack 0x58
save_xmm128 xmm6, 0
save_xmm128 xmm7, 16
save_xmm128 xmm8, 32
save_xmm128 xmm9, 48
[endprolog]
movss xmm0, [r8]
shufps xmm0, xmm0, 0
movss xmm1, [r8 + 4]
shufps xmm1, xmm1, 0
movss xmm2, [r8 + 8]
shufps xmm2, xmm2, 0
; pointers to x, y, z in r8, r9, r10
mov r8, [rdx]
mov r9, [rdx + 8]
mov r10, [rdx + 16]
; reference point is in xmm0, xmm1, xmm2 (x, y, z)
movdqa xmm3, [rel centroid_index] ; min_index
movdqa xmm4, xmm3 ; current index
movdqa xmm9, [rel centroid_four] ; index increment
paddd xmm4, xmm9
; calculate initial min_dist, xmm5
movaps xmm5, [r8]
subps xmm5, xmm0
movaps xmm7, [r9]
subps xmm7, xmm1
movaps xmm8, [r10]
subps xmm8, xmm2
mulps xmm5, xmm5
mulps xmm7, xmm7
mulps xmm8, xmm8
addps xmm5, xmm7
addps xmm5, xmm8
add r8, 16
add r9, 16
add r10, 16
sub ecx, 4
jna _tail
_loop:
movaps xmm6, [r8]
subps xmm6, xmm0
movaps xmm7, [r9]
subps xmm7, xmm1
movaps xmm8, [r10]
subps xmm8, xmm2
mulps xmm6, xmm6
mulps xmm7, xmm7
mulps xmm8, xmm8
addps xmm6, xmm7
addps xmm6, xmm8
add r8, 16
add r9, 16
add r10, 16
movaps xmm7, xmm6
cmpps xmm6, xmm5, 1
minps xmm5, xmm7
movdqa xmm7, xmm6
pand xmm6, xmm4
pandn xmm7, xmm3
por xmm6, xmm7
movdqa xmm3, xmm6
paddd xmm4, xmm9
sub ecx, 4
ja _loop
_tail:
; calculate horizontal minumum
pshufd xmm0, xmm5, 0xB1
minps xmm0, xmm5
pshufd xmm1, xmm0, 0x4E
minps xmm0, xmm1
; find index of the minimum
cmpps xmm0, xmm5, 0
movmskps eax, xmm0
bsf eax, eax
; index into xmm3, sort of
movaps [rsp + 64], xmm3
mov eax, [rsp + 64 + rax * 4]
movaps xmm9, [rsp + 48]
movaps xmm8, [rsp + 32]
movaps xmm7, [rsp + 16]
movaps xmm6, [rsp]
add rsp, 0x58
ret
endproc_frame
В С++:
extern "C" int find_closest(int n, float** points, float* reference_point);
Ответ 2
Вы можете использовать веткистый тернарный оператор, иногда называемый битселек (условие? true: false).
Просто используйте его для 2 участников, не выполнив ничего.
Не беспокойтесь о дополнительных операциях, они ничто по сравнению с ветвлением if.
bitselect реализация:
inline static int bitselect(int condition, int truereturnvalue, int falsereturnvalue)
{
return (truereturnvalue & -condition) | (falsereturnvalue & ~(-condition)); //a when TRUE and b when FALSE
}
inline static float bitselect(int condition, float truereturnvalue, float falsereturnvalue)
{
//Reinterpret floats. Would work because it just a bit select, no matter the actual value
int& at = reinterpret_cast<int&>(truereturnvalue);
int& af = reinterpret_cast<int&>(falsereturnvalue);
int res = (at & -condition) | (af & ~(-condition)); //a when TRUE and b when FALSE
return reinterpret_cast<float&>(res);
}
И ваша петля должна выглядеть так:
for (int i=0; i < num_centroids; ++i)
{
const ClusterCentroid& cent = centroids[i];
const float dist = ...;
bool isSmaeller = dist < pt.min_dist;
//use same value if not smaller
pt.min_index = bitselect(isSmaeller, i, pt.min_index);
pt.min_dist = bitselect(isSmaeller, dist, pt.min_dist);
}
Ответ 3
С++ - это язык высокого уровня. Ваше предположение о том, что поток управления в исходном коде С++ преобразуется в инструкции ветвления, является ошибочным. У меня нет определения некоторых типов из вашего примера, поэтому я сделал простую тестовую программу с аналогичными условными присвоениями:
int g(int, int);
int f(const int *arr)
{
int min = 10000, minIndex = -1;
for ( int i = 0; i < 1000; ++i )
{
if ( arr[i] < min )
{
min = arr[i];
minIndex = i;
}
}
return g(min, minIndex);
}
Обратите внимание, что использование undefined "g" заключается в том, чтобы исключить возможность удаления оптимизатора. Я перевел это с помощью g++ 4.9.2 с -O3 и -S в сборку x86_64 (даже не изменив значение по умолчанию для -march), и (не слишком неожиданным) результатом является то, что тело цикла не содержит ветвей
movl (%rdi,%rax,4), %ecx
movl %edx, %r8d
cmpl %edx, %ecx
cmovle %ecx, %r8d
cmovl %eax, %esi
addq $1, %rax
Кроме того, предположение о том, что разветвление обязательно быстрее, также может быть ошибочным, поскольку вероятность того, что новое расстояние "бьет" старое, уменьшает количество элементов, на которые вы смотрели. Это не монета. Трюк "битселек" был изобретен, когда компиляторы были гораздо менее агрессивны при создании сборки "как есть", чем сегодня. Я бы предпочел взглянуть на то, какую сборку создает ваш компилятор, прежде чем пытаться переработать код, чтобы компилятор мог лучше его оптимизировать или взяв результат в качестве основы для рукописной сборки. Если вы хотите изучить SIMD, я бы предложил попробовать подход "минимум минимумов" с уменьшенными зависимостями данных (в моем примере зависимости от "мин", вероятно, являются узким местом).
Ответ 4
Это может пойти в обоих направлениях, но я бы попробовал следующую структуру:
std::vector<float> centDists(num_centroids); //<-- one for each thread.
for (size_t p=0; p<num_points; ++p) {
Point& pt = points[p];
for (size_t c=0; c<num_centroids; ++c) {
const float dist = ...;
centDists[c]=dist;
}
pt.min_idx it= min_element(centDists.begin(),centDists.end())-centDists.begin();
}
Очевидно, вам придется перебирать два раза по памяти, что, вероятно, повредит отношение кеша к пропуску (вы также можете разбить его на поддиапазоны), но, с другой стороны, каждая из внутренних циклов должна быть легко векторизовать и развернуть - так что вам нужно только измерить, стоит ли это.
И даже если вы придерживаетесь своей версии, я бы попытался использовать локальные переменные, чтобы отслеживать минимальный индекс и расстояние и применять результаты, чтобы указать в конце.
Рациональным является то, что каждое чтение или запись на pt.min_dist
эффективно выполняется с помощью указателя, который - в зависимости от оптимизации компилятора - может или не может снизить вашу производительность.
Еще одна важная вещь для векторизации - превратить массив Structs (в данном случае cententroids) в структуру массивов (например, один массив для каждой координаты точек), так как вам не нужны дополнительные команды сбора, чтобы загрузить данные для использования с инструкциями SIMD. См. Эрик Брумер для получения дополнительной информации по этой теме.
EDIT: Некоторые номера для моей системы (haswell, clang 3.5):
Я сделал короткий тест с вашим эталоном и моей системой, выше кода замедлил алгоритм примерно на 10% - по существу, ничто не могло быть векторизованным.
Однако, применяя преобразование AoS к SoA для ваших центроидов, расчет расстояний был векторизован, что привело к сокращению общей продолжительности выполнения около 40% по сравнению с вашей исходной структурой с применением преобразований AoS для SoA.
Ответ 5
Во-первых, я бы посоветовал, что перед тем, как попробовать какие-либо изменения кода, вы посмотрите на разборку в оптимизированной сборке. В идеале вы хотите посмотреть данные профилировщика на уровне сборки. Это может отображать различные вещи, например:
- Компилятор, возможно, не сгенерировал действительную инструкцию ветвления.
- Строка кода с узким местом может содержать гораздо больше инструкций, чем вы могли бы подумать - например, вычисление dist.
В дополнение к этому, стандартный трюк, который, когда вы говорите о вычислениях расстояний, часто требует квадратного корня. Вы должны сделать этот квадратный корень в конце процесса с минимальным квадратом.
SSE может обрабатывать четыре значения одновременно, без каких-либо ветвей, используя _ mm_min_ps. Если вам действительно нужна скорость, вы хотите использовать встроенные функции SSE (или AVX). Вот базовый пример:
float MinimumDistance(const float *values, int count)
{
__m128 min = _mm_set_ps(FLT_MAX, FLT_MAX, FLT_MAX, FLT_MAX);
int i=0;
for (; i < count - 3; i+=4)
{
__m128 distances = _mm_loadu_ps(&values[i]);
min = _mm_min_ps(min, distances);
}
// Combine the four separate minimums to a single value
min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(2, 3, 0, 1)));
min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(1, 0, 3, 2)));
// Deal with the last 0-3 elements the slow way
float result = FLT_MAX;
if (count > 3) _mm_store_ss(&result, min);
for (; i < count; i++)
{
result = min(values[i], result);
}
return result;
}
Для лучшей производительности SSE вы должны убедиться, что нагрузки происходят по согласованным адресам. Вы можете обрабатывать первые несколько несогласованных элементов так же, как и последние несколько в коде выше, если это необходимо.
Другое, на что нужно обратить внимание - это пропускная способность памяти. Если есть несколько членов структуры ClusterCentroid, которые вы не используете в течение этого цикла, тогда вы будете считывать гораздо больше данных из памяти, чем вам действительно нужно, поскольку память читается в кусках размера строки кэша, каждая по 64 байта.
Ответ 6
Возможная микрооптимизация: Храните min_dist и min_index в локальных переменных. Компилятору, возможно, придется чаще писать в память так, как вы его написали; на некоторых архитектурах это может иметь большое влияние на производительность. См. мой ответ здесь для другого примера.
Предложение Адамса о выполнении 4 сравнений сразу также является хорошим.
Тем не менее, ваше лучшее ускорение будет связано с уменьшением количества центроидов, которые вы должны проверить. В идеале, постройте kd-дерево (или подобное) вокруг центроидов, затем запросите, чтобы найти ближайшую точку.
Если у вас нет кода дерева, лежащего вокруг, вот мой самый любимый поисковый запрос "бедный человек":
Sort the points by one coordinate, e.g. cent.pos[0]
Pick a starting index for the query point (pt)
Iterate forwards through the candidate points until you reach the end, OR when abs(pt.pos[0] - cent.pos[0]) > min_dist
Repeat the previous step going the opposite direction.
Дополнительное условие остановки для поиска означает, что вы должны пропустить достаточное количество очков; вы также не должны пропускать все точки ближе, чем те, которые вы уже нашли.
Итак, для вашего кода это выглядит примерно так:
// sort centroid by x coordinate.
min_index = -1;
min_dist = numeric_limits<float>::max();
// pick the start index. This works well if the points are evenly distributed.
float min_x = centroids[0].pos[0];
float max_x = centroids[num_centroids-1].pos[0];
float cur_x = pt.pos[0];
float t = (max_x - cur_x) / (max_x - min_x);
// TODO clamp t between 0 and 1
int start_index = int(t * float(num_centroids))
// Forward search
for (int i=start_index ; i < num_centroids; ++i)
{
const ClusterCentroid& cent = centroids[i];
if (fabs(cent.pos[0] - pt.pos[0]) > min_i)
// Everything to the right of this must be further min_dist, so break.
// This is where the savings comes from!
break;
const float dist = ...;
if (dist < min_dist)
{
min_dist = dist;
min_index = i;
}
}
// Backwards search
for (int i=start_index ; i >= 0; --i)
{
// same as above
}
pt.min_dist = min_dist
pt.min_index = min_index
(Обратите внимание, что это предполагает, что вы вычисляете расстояние между точками, но ваша сборка указывает на квадрат расстояния. Соответственно отрегулируйте условие разрыва).
При создании дерева или сортировке центроидов незначительные накладные расходы, но это должно быть компенсировано за счет ускорения вычислений в более крупном цикле (по количеству точек).