Как ускорить мой разрешенный матричный решатель?
Я пишу разреженный матричный решатель, используя метод Гаусса-Зейделя. По профилированию я решил, что около половины моего времени программы проводится внутри решателя. Критическая часть производительности выглядит следующим образом:
size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
for (size_t y = 1; y < d_ny - 1; ++y) {
for (size_t x = 1; x < d_nx - 1; ++x) {
d_x[ic] = d_b[ic]
- d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
- d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
++ic; ++iw; ++ie; ++is; ++in;
}
ic += 2; iw += 2; ie += 2; is += 2; in += 2;
}
Все задействованные массивы имеют тип float
. На самом деле это не массивы, а объекты с перегруженным оператором []
, который (я думаю) должен быть оптимизирован, но определяется следующим образом:
inline float &operator[](size_t i) { return d_cells[i]; }
inline float const &operator[](size_t i) const { return d_cells[i]; }
Для d_nx = d_ny = 128
, это может быть запущено около 3500 раз в секунду на Intel i7 920. Это означает, что тело внутреннего цикла работает 3500 * 128 * 128 = 57 миллионов раз в секунду. Поскольку задействована только какая-то простая арифметика, это поражает меня как низкое число для процессора с частотой 2,66 ГГц.
Возможно, это не ограничено мощностью процессора, а пропускной способностью памяти? Ну, один массив 128 * 128 float
питается 65 кБ, поэтому все 6 массивов должны легко вписываться в кеш процессора L3 (что составляет 8 МБ). Предполагая, что ничего не кэшируется в регистрах, я подсчитываю 15 обращений к памяти во внутреннем цикле. В 64-битной системе это составляет 120 байт на итерацию, поэтому 57 миллионов * 120 байт = 6,8 ГБ/с. Кэш L3 работает на частоте 2,66 ГГц, поэтому он имеет тот же порядок величины. Я предполагаю, что память действительно является узким местом.
Чтобы ускорить это, я попытался выполнить следующее:
-
Скомпилируйте с помощью g++ -O3
. (Ну, я делал это с самого начала.)
-
Параллелизация более 4 ядер с использованием OpenMP-прагм. Я должен перейти к алгоритму Якоби, чтобы избежать чтения и записи в тот же массив. Это требует, чтобы я делал в два раза больше итераций, что приводило к чистому результату примерно той же скорости.
-
Реализация деталей реализации тела цикла, например, с использованием указателей вместо индексов. Нет эффекта.
Какой лучший способ ускорить этот парень? Помогло бы оно переписать внутреннее тело в собрании (я должен был бы это узнать первым)? Должен ли я запускать это на графическом процессоре (что я знаю, как это сделать, но это такая проблема)? Любые другие яркие идеи?
(N.B. Я принимаю "нет" для ответа, как в: "это невозможно сделать значительно быстрее, потому что..." )
Обновить: по запросу, здесь полная программа:
#include <iostream>
#include <cstdlib>
#include <cstring>
using namespace std;
size_t d_nx = 128, d_ny = 128;
float *d_x, *d_b, *d_w, *d_e, *d_s, *d_n;
void step() {
size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
for (size_t y = 1; y < d_ny - 1; ++y) {
for (size_t x = 1; x < d_nx - 1; ++x) {
d_x[ic] = d_b[ic]
- d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
- d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
++ic; ++iw; ++ie; ++is; ++in;
}
ic += 2; iw += 2; ie += 2; is += 2; in += 2;
}
}
void solve(size_t iters) {
for (size_t i = 0; i < iters; ++i) {
step();
}
}
void clear(float *a) {
memset(a, 0, d_nx * d_ny * sizeof(float));
}
int main(int argc, char **argv) {
size_t n = d_nx * d_ny;
d_x = new float[n]; clear(d_x);
d_b = new float[n]; clear(d_b);
d_w = new float[n]; clear(d_w);
d_e = new float[n]; clear(d_e);
d_s = new float[n]; clear(d_s);
d_n = new float[n]; clear(d_n);
solve(atoi(argv[1]));
cout << d_x[0] << endl; // prevent the thing from being optimized away
}
Я компилирую и запускаю его следующим образом:
$ g++ -o gstest -O3 gstest.cpp
$ time ./gstest 8000
0
real 0m1.052s
user 0m1.050s
sys 0m0.010s
(Он делает 8000 вместо 3500 итераций в секунду, потому что моя "настоящая" программа также делает много других вещей, но она репрезентативная.)
Обновление 2: мне сказали, что унифицированные значения могут не быть репрезентативными, потому что значения NaN и Inf могут замедлять работу. Теперь очистка памяти в примере кода. Тем не менее, это не имеет никакого значения для меня в скорости выполнения.
Ответы
Ответ 1
Пара идей:
-
Используйте SIMD. Вы можете загружать 4 поплавки за раз из каждого массива в регистр SIMD (например, SSE на Intel, VMX на PowerPC). Недостатком этого является то, что некоторые из значений d_x будут "устаревшими", поэтому ваш коэффициент конвергенции будет страдать (но не так плохо, как итерация jacobi); трудно сказать, ускоряет ли его ускорение.
-
Используйте SOR. Он прост, не добавляет большого количества вычислений и может значительно улучшить коэффициент конвергенции даже при относительно консервативной релаксации (скажем, 1.5).
-
Используйте сопряженный градиент. Если это для этапа проекции моделирования текучей среды (т.е. Обеспечения несжимаемости), вы должны иметь возможность применять CG и получать гораздо лучшую скорость конвергенции. Хороший предобуславливатель помогает еще больше.
-
Используйте специализированный решатель. Если линейная система возникает из уравнения Пуассона, вы можете сделать даже лучше, чем сопряженный градиент, используя методы на основе FFT.
Если вы можете объяснить больше о том, как выглядит система, которую вы пытаетесь решить, я могу, возможно, дать еще несколько советов о # 3 и # 4.
Ответ 2
Я думаю, что мне удалось оптимизировать его, вот код, создать новый проект в VС++, добавить этот код и просто скомпилировать под "Release".
#include <iostream>
#include <cstdlib>
#include <cstring>
#define _WIN32_WINNT 0x0400
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <conio.h>
using namespace std;
size_t d_nx = 128, d_ny = 128;
float *d_x, *d_b, *d_w, *d_e, *d_s, *d_n;
void step_original() {
size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
for (size_t y = 1; y < d_ny - 1; ++y) {
for (size_t x = 1; x < d_nx - 1; ++x) {
d_x[ic] = d_b[ic]
- d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
- d_s[ic] * d_x[is] - d_n[ic] * d_x[in];
++ic; ++iw; ++ie; ++is; ++in;
}
ic += 2; iw += 2; ie += 2; is += 2; in += 2;
}
}
void step_new() {
//size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
float
*d_b_ic,
*d_w_ic,
*d_e_ic,
*d_x_ic,
*d_x_iw,
*d_x_ie,
*d_x_is,
*d_x_in,
*d_n_ic,
*d_s_ic;
d_b_ic = d_b;
d_w_ic = d_w;
d_e_ic = d_e;
d_x_ic = d_x;
d_x_iw = d_x;
d_x_ie = d_x;
d_x_is = d_x;
d_x_in = d_x;
d_n_ic = d_n;
d_s_ic = d_s;
for (size_t y = 1; y < d_ny - 1; ++y)
{
for (size_t x = 1; x < d_nx - 1; ++x)
{
/*d_x[ic] = d_b[ic]
- d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie]
- d_s[ic] * d_x[is] - d_n[ic] * d_x[in];*/
*d_x_ic = *d_b_ic
- *d_w_ic * *d_x_iw - *d_e_ic * *d_x_ie
- *d_s_ic * *d_x_is - *d_n_ic * *d_x_in;
//++ic; ++iw; ++ie; ++is; ++in;
d_b_ic++;
d_w_ic++;
d_e_ic++;
d_x_ic++;
d_x_iw++;
d_x_ie++;
d_x_is++;
d_x_in++;
d_n_ic++;
d_s_ic++;
}
//ic += 2; iw += 2; ie += 2; is += 2; in += 2;
d_b_ic += 2;
d_w_ic += 2;
d_e_ic += 2;
d_x_ic += 2;
d_x_iw += 2;
d_x_ie += 2;
d_x_is += 2;
d_x_in += 2;
d_n_ic += 2;
d_s_ic += 2;
}
}
void solve_original(size_t iters) {
for (size_t i = 0; i < iters; ++i) {
step_original();
}
}
void solve_new(size_t iters) {
for (size_t i = 0; i < iters; ++i) {
step_new();
}
}
void clear(float *a) {
memset(a, 0, d_nx * d_ny * sizeof(float));
}
int main(int argc, char **argv) {
size_t n = d_nx * d_ny;
d_x = new float[n]; clear(d_x);
d_b = new float[n]; clear(d_b);
d_w = new float[n]; clear(d_w);
d_e = new float[n]; clear(d_e);
d_s = new float[n]; clear(d_s);
d_n = new float[n]; clear(d_n);
if(argc < 3)
printf("app.exe (x)iters (o/n)algo\n");
bool bOriginalStep = (argv[2][0] == 'o');
size_t iters = atoi(argv[1]);
/*printf("Press any key to start!");
_getch();
printf(" Running speed test..\n");*/
__int64 freq, start, end, diff;
if(!::QueryPerformanceFrequency((LARGE_INTEGER*)&freq))
throw "Not supported!";
freq /= 1000000; // microseconds!
{
::QueryPerformanceCounter((LARGE_INTEGER*)&start);
if(bOriginalStep)
solve_original(iters);
else
solve_new(iters);
::QueryPerformanceCounter((LARGE_INTEGER*)&end);
diff = (end - start) / freq;
}
printf("Speed (%s)\t\t: %u\n", (bOriginalStep ? "original" : "new"), diff);
//_getch();
//cout << d_x[0] << endl; // prevent the thing from being optimized away
}
Запустите его следующим образом:
app.exe 10000 o
app.exe 10000 n
"o" означает старый код, ваш.
"n" - мой, новый.
Мои результаты:
Скорость (оригинал):
1515028
1523171
1495988
Скорость (новая):
966012
984110
1006045
Улучшение около 30%.
Логика:
Вы используете счетчики индексов для доступа/управления.
Я использую указатели.
Во время работы точка останова на определенной строке кода калькуляции в отладчике VС++ и нажмите F8. Вы получите окно дизассемблера.
Вы увидите созданные коды операций (код сборки).
В любом случае, посмотрите:
int * x =...;
x [3] = 123;
Это говорит ПК поставить указатель x в регистр (например, EAX).
Добавьте его (3 * sizeof (int)).
Только тогда установите значение 123.
Подход указателей намного лучше, чем вы можете понять, потому что мы сокращаем процесс добавления, на самом деле мы справляемся с этим сами, поэтому можем оптимизировать по мере необходимости.
Надеюсь, это поможет.
Поделитесь с сотрудниками stackoverflow.com:
Отличный сайт, надеюсь, я давно об этом слышал!
Ответ 3
С одной стороны, здесь, похоже, проблема конвейерной обработки. Цикл читается из значения в d_x
, которое только что было написано, но, видимо, оно должно дождаться завершения этой записи. Просто переставляя порядок вычислений, делая что-то полезное во время ожидания, он делает это почти в два раза быстрее:
d_x[ic] = d_b[ic]
- d_e[ic] * d_x[ie]
- d_s[ic] * d_x[is] - d_n[ic] * d_x[in]
- d_w[ic] * d_x[iw] /* d_x[iw] has just been written to, process this last */;
Это был Eamon Nerbonne, который понял это. У него много оборотов! Я бы никогда не догадался.
Ответ 4
Ответ Poni выглядит как правильный для меня.
Я просто хочу указать, что в этом типе проблемы вы часто получаете преимущества от локализации памяти. Прямо сейчас массивы b,w,e,s,n
находятся в разных местах памяти. Если бы вы не смогли решить проблему в кеше L3 (в основном в L2), это было бы плохо, и решение такого рода было бы полезно:
size_t d_nx = 128, d_ny = 128;
float *d_x;
struct D { float b,w,e,s,n; };
D *d;
void step() {
size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
for (size_t y = 1; y < d_ny - 1; ++y) {
for (size_t x = 1; x < d_nx - 1; ++x) {
d_x[ic] = d[ic].b
- d[ic].w * d_x[iw] - d[ic].e * d_x[ie]
- d[ic].s * d_x[is] - d[ic].n * d_x[in];
++ic; ++iw; ++ie; ++is; ++in;
}
ic += 2; iw += 2; ie += 2; is += 2; in += 2;
}
}
void solve(size_t iters) { for (size_t i = 0; i < iters; ++i) step(); }
void clear(float *a) { memset(a, 0, d_nx * d_ny * sizeof(float)); }
int main(int argc, char **argv) {
size_t n = d_nx * d_ny;
d_x = new float[n]; clear(d_x);
d = new D[n]; memset(d,0,n * sizeof(D));
solve(atoi(argv[1]));
cout << d_x[0] << endl; // prevent the thing from being optimized away
}
Например, это решение на 1280x1280 немного меньше, чем в 2 раза быстрее, чем решение Poni (13s против 23s в моем тесте - ваша первоначальная реализация - это 22 с), тогда как в 128x128 он на 30% медленнее (7s против 10s- - ваш оригинал - 10 с).
(Итерации были увеличены до 80000 для базового футляра и 800 для случая размером 100x 1280x1280.)
Ответ 5
Я думаю, вы правы в том, что память является узким местом. Это довольно простой цикл с простой арифметикой для каждой итерации. ic, iw, т.е. есть, а в индексах, по-видимому, находятся на противоположных сторонах матрицы, поэтому я предполагаю, что там не хватает кеша.
Ответ 6
Я не эксперт по этому вопросу, но я видел, что есть несколько научных статей об улучшении использования кеша в Метод Гаусса-Зейделя.
Еще одна возможная оптимизация - использование варианта red-black, где точки обновляются двумя разметками в виде шахматной доски. Таким образом, все обновления в развертке независимы и могут быть распараллелены.
Ответ 7
Я предлагаю ввести некоторые предзадачные заявления, а также исследовать "ориентированный на данные дизайн":
void step_original() {
size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1;
float dw_ic, dx_ic, db_ic, de_ic, dn_ic, ds_ic;
float dx_iw, dx_is, dx_ie, dx_in, de_ic, db_ic;
for (size_t y = 1; y < d_ny - 1; ++y) {
for (size_t x = 1; x < d_nx - 1; ++x) {
// Perform the prefetch
// Sorting these statements by array may increase speed;
// although sorting by index name may increase speed too.
db_ic = d_b[ic];
dw_ic = d_w[ic];
dx_iw = d_x[iw];
de_ic = d_e[ic];
dx_ie = d_x[ie];
ds_ic = d_s[ic];
dx_is = d_x[is];
dn_ic = d_n[ic];
dx_in = d_x[in];
// Calculate
d_x[ic] = db_ic
- dw_ic * dx_iw - de_ic * dx_ie
- ds_ic * dx_is - dn_ic * dx_in;
++ic; ++iw; ++ie; ++is; ++in;
}
ic += 2; iw += 2; ie += 2; is += 2; in += 2;
}
}
Это отличается от вашего второго метода, поскольку значения копируются в локальные временные переменные перед выполнением вычисления.