Java sum two double [] [] с параллельным потоком
Скажем, у меня есть две матрицы:
double[][] a = new double[2][2]
a[0][0] = 1
a[0][1] = 2
a[1][0] = 3
a[1][1] = 4
double[][] b = new double[2][2]
b[0][0] = 1
b[0][1] = 2
b[1][0] = 3
b[1][1] = 4
традиционным способом, чтобы суммировать эти матрицы, я бы сделал вложенный цикл:
int rows = a.length;
int cols = a[0].length;
double[][] res = new double[rows][cols];
for(int i = 0; i < rows; i++){
for(int j = 0; j < cols; j++){
res[i][j] = a[i][j] + b[i][j];
}
}
Я новичок в потоковом API, но я думаю, что это очень удобно использовать с parallelStream
, поэтому мой вопрос в том, есть ли способ сделать это и использовать параллельную обработку?
Изменить: не уверен, что это подходящее место, но здесь мы идем:
Используя некоторые предложения, я поместил Stream в тест. Настройка была такой:
Классический подход:
public class ClassicMatrix {
private final double[][] components;
private final int cols;
private final int rows;
public ClassicMatrix(final double[][] components){
this.components = components;
this.rows = components.length;
this.cols = components[0].length;
}
public ClassicMatrix addComponents(final ClassicMatrix a) {
final double[][] res = new double[rows][cols];
for (int i = 0; i < rows; i++) {
for (int j = 0; j < rows; j++) {
res[i][j] = components[i][j] + a.components[i][j];
}
}
return new ClassicMatrix(res);
}
}
Использование предложения @dkatzel:
public class MatrixStream1 {
private final double[][] components;
private final int cols;
private final int rows;
public MatrixStream1(final double[][] components){
this.components = components;
this.rows = components.length;
this.cols = components[0].length;
}
public MatrixStream1 addComponents(final MatrixStream1 a) {
final double[][] res = new double[rows][cols];
IntStream.range(0, rows*cols).parallel().forEach(i -> {
int x = i/rows;
int y = i%rows;
res[x][y] = components[x][y] + a.components[x][y];
});
return new MatrixStream1(res);
}
}
Используя предложение @Eugene:
public class MatrixStream2 {
private final double[][] components;
private final int cols;
private final int rows;
public MatrixStream2(final double[][] components) {
this.components = components;
this.rows = components.length;
this.cols = components[0].length;
}
public MatrixStream2 addComponents(final MatrixStream2 a) {
final double[][] res = new double[rows][cols];
IntStream.range(0, rows)
.forEach(i -> Arrays.parallelSetAll(res[i], j -> components[i][j] * a.components[i][j]));
return new MatrixStream2(res);
}
}
и тестовый класс, выполняющий 3 независимых раза по одному для каждого метода (просто заменив имя метода в main()):
public class MatrixTest {
private final static String path = "/media/manuel/workspace/data/";
public static void main(String[] args) {
final List<Double[]> lst = new ArrayList<>();
for (int i = 100; i < 8000; i = i + 400) {
final Double[] d = testClassic(i);
System.out.println(d[0] + " : " + d[1]);
lst.add(d);
}
IOUtils.saveToFile(path + "classic.csv", lst);
}
public static Double[] testClassic(final int i) {
final ClassicMatrix a = new ClassicMatrix(rand(i));
final ClassicMatrix b = new ClassicMatrix(rand(i));
final long start = System.currentTimeMillis();
final ClassicMatrix mul = a.addComponents(b);
final long now = System.currentTimeMillis();
final double elapsed = (now - start);
return new Double[] { (double) i, elapsed };
}
public static Double[] testStream1(final int i) {
final MatrixStream1 a = new MatrixStream1(rand(i));
final MatrixStream1 b = new MatrixStream1(rand(i));
final long start = System.currentTimeMillis();
final MatrixStream1 mul = a.addComponents(b);
final long now = System.currentTimeMillis();
final double elapsed = (now - start);
return new Double[] { (double) i, elapsed };
}
public static Double[] testStream2(final int i) {
final MatrixStream2 a = new MatrixStream2(rand(i));
final MatrixStream2 b = new MatrixStream2(rand(i));
final long start = System.currentTimeMillis();
final MatrixStream2 mul = a.addComponents(b);
final long now = System.currentTimeMillis();
final double elapsed = (now - start);
return new Double[] { (double) i, elapsed };
}
private static double[][] rand(final int size) {
final double[][] rnd = new double[size][size];
for (int i = 0; i < size; i++) {
for (int j = 0; j < size; j++) {
rnd[i][j] = Math.random();
}
}
return rnd;
}
}
Результаты:
Classic Matrix size, Time (ms)
100.0,1.0
500.0,5.0
900.0,5.0
1300.0,43.0
1700.0,94.0
2100.0,26.0
2500.0,33.0
2900.0,46.0
3300.0,265.0
3700.0,71.0
4100.0,87.0
4500.0,380.0
4900.0,432.0
5300.0,215.0
5700.0,238.0
6100.0,577.0
6500.0,677.0
6900.0,609.0
7300.0,584.0
7700.0,592.0
Stream1, Time(ms)
100.0,86.0
500.0,13.0
900.0,9.0
1300.0,47.0
1700.0,92.0
2100.0,29.0
2500.0,33.0
2900.0,46.0
3300.0,253.0
3700.0,71.0
4100.0,90.0
4500.0,352.0
4900.0,373.0
5300.0,497.0
5700.0,485.0
6100.0,579.0
6500.0,711.0
6900.0,800.0
7300.0,780.0
7700.0,902.0
Stream2, Time(ms)
100.0,111.0
500.0,42.0
900.0,12.0
1300.0,54.0
1700.0,97.0
2100.0,110.0
2500.0,177.0
2900.0,71.0
3300.0,250.0
3700.0,106.0
4100.0,359.0
4500.0,143.0
4900.0,233.0
5300.0,261.0
5700.0,289.0
6100.0,406.0
6500.0,814.0
6900.0,830.0
7300.0,828.0
7700.0,911.0
Я сделал заговор для лучшего сравнения:
![Тест производительности]()
Нет никакого улучшения. Где ошибка? Являются ли матрицы малыми (7700 x 7700)? Более того, это взрывает память моего компьютера.
Ответы
Ответ 1
Один из способов сделать это можно с помощью Arrays.parallelSetAll
:
int rows = a.length;
int cols = a[0].length;
double[][] res = new double[rows][cols];
Arrays.parallelSetAll(res, i -> {
Arrays.parallelSetAll(res[i], j -> a[i][j] + b[i][j]);
return res[i];
});
Я не уверен на 100%, но я думаю, что внутренний вызов Arrays.parallelSetAll
может не стоить накладных расходов на создание внутренней распараллеливания для каждого столбца строки. Возможно, этого достаточно, чтобы распараллелить сумму только для каждой строки:
Arrays.parallelSetAll(res, i -> {
Arrays.setAll(res[i], j -> a[i][j] + b[i][j]);
return res[i];
});
В любом случае, вы должны тщательно измерить, прежде чем добавлять параллелизм в алгоритм, потому что много раз накладные расходы настолько велики, что его не стоит использовать.
Ответ 2
Это еще не измерено (я немного позже), но не должно ли уже построить в Arrays.parallelSetAll
выполнить работу самым быстрым способом?
for (int i = 0; i < a.length; ++i) {
int j = i;
Arrays.parallelSetAll(r[j], x -> a[j][x] + b[j][x]);
}
Или даже приятнее:
IntStream.range(0, a.length)
.forEach(i -> Arrays.parallelSetAll(r[i], j -> a[i][j] + b[i][j]));
Это очень хорошо сочетается с кэшами процессора, так как вероятность того, что следующая запись находится в одной и той же строке кэша, велика. Выполнение чтения в обратном порядке (столбцы и строки) будет рассеивать чтение по всему месту.
Я поставил jmh test здесь. Обратите внимание, что Федерико ответ является самым быстрым. Подумайте о своей идее.
Вот результаты:
Benchmark (howManyEntries) Mode Cnt Score Error Units
DoubleArraySum.dkatzel 100 avgt 10 0.055 ± 0.005 ms/op
DoubleArraySum.dkatzel 500 avgt 10 0.997 ± 0.156 ms/op
DoubleArraySum.dkatzel 1000 avgt 10 4.162 ± 0.368 ms/op
DoubleArraySum.dkatzel 3000 avgt 10 39.619 ± 4.391 ms/op
DoubleArraySum.dkatzel 8000 avgt 10 236.468 ± 41.599 ms/op
DoubleArraySum.eugene 100 avgt 10 0.671 ± 0.187 ms/op
DoubleArraySum.eugene 500 avgt 10 6.317 ± 0.268 ms/op
DoubleArraySum.eugene 1000 avgt 10 14.751 ± 0.676 ms/op
DoubleArraySum.eugene 3000 avgt 10 65.174 ± 6.044 ms/op
DoubleArraySum.eugene 8000 avgt 10 285.571 ± 23.206 ms/op
DoubleArraySum.federico1 100 avgt 10 0.169 ± 0.010 ms/op
DoubleArraySum.federico1 500 avgt 10 1.999 ± 0.217 ms/op
DoubleArraySum.federico1 1000 avgt 10 6.087 ± 1.108 ms/op
DoubleArraySum.federico1 3000 avgt 10 40.825 ± 4.853 ms/op
DoubleArraySum.federico1 8000 avgt 10 267.446 ± 37.490 ms/op
DoubleArraySum.federico2 100 avgt 10 0.034 ± 0.003 ms/op
DoubleArraySum.federico2 500 avgt 10 0.974 ± 0.152 ms/op
DoubleArraySum.federico2 1000 avgt 10 3.245 ± 0.080 ms/op
DoubleArraySum.federico2 3000 avgt 10 30.503 ± 5.960 ms/op
DoubleArraySum.federico2 8000 avgt 10 183.183 ± 21.861 ms/op
DoubleArraySum.holijava 100 avgt 10 0.063 ± 0.002 ms/op
DoubleArraySum.holijava 500 avgt 10 1.112 ± 0.020 ms/op
DoubleArraySum.holijava 1000 avgt 10 4.138 ± 0.062 ms/op
DoubleArraySum.holijava 3000 avgt 10 41.784 ± 1.029 ms/op
DoubleArraySum.holijava 8000 avgt 10 266.590 ± 4.080 ms/op
DoubleArraySum.pivovarit 100 avgt 10 0.112 ± 0.002 ms/op
DoubleArraySum.pivovarit 500 avgt 10 2.427 ± 0.075 ms/op
DoubleArraySum.pivovarit 1000 avgt 10 9.572 ± 0.355 ms/op
DoubleArraySum.pivovarit 3000 avgt 10 84.413 ± 2.197 ms/op
DoubleArraySum.pivovarit 8000 avgt 10 690.942 ± 34.993 ms/op
ИЗМЕНИТЬ
здесь более читаемый вывод (federico выигрывает со всеми входами)
100=[federico2, dkatzel, holijava, pivovarit, federico1, eugene]
500=[federico2, dkatzel, holijava, federico1, pivovarit, eugene]
1000=[federico2, holijava, dkatzel, federico1, pivovarit, eugene]
3000=[federico2, dkatzel, federico1, holijava, eugene, pivovarit]
8000=[federico2, dkatzel, holijava, federico1, eugene, pivovarit]
Ответ 3
Единственная опция, которую я вижу здесь, - это больше/меньше генерировать все возможные пары индексов, а затем извлекать элементы и применять суммирование. Использование параллельных потоков не будет иметь никакого дополнительного положительного эффекта здесь с таким небольшим примером, но вы можете с уверенностью использовать Stream API здесь (и сразу же конвертировать в параллель), хотя результат не так хорош, как ожидалось:
IntStream.range(0, a.length).boxed()
.flatMap(i -> IntStream.range(0, a[0].length)
.mapToObj(j -> new AbstractMap.SimpleImmutableEntry<>(i, j)))
.parallel()
.forEach(e -> {
res[e.getKey()][e.getValue()]
= a[e.getKey()][e.getValue()] + b[e.getKey()][e.getValue()];
});
Нам нужно ввести посредника (middlepair?), чтобы мы могли распараллелить один Stream
, а не играть с распараллеленным вложенным Streams
.
Еще один расширенный способ - реализовать собственный пользовательский коллекционер, но в какой-то момент он будет включать в себя вложенную петлю.
Истинную силу Stream API можно наблюдать при попытке суммировать все значения из двух массивов:
Stream.concat(Arrays.stream(a), Arrays.stream(b)).parallel()
.flatMapToDouble(Arrays::stream)
.sum();
Ответ 4
Вы можете использовать IntStream
для создания потока по числу ячеек в матрице, а затем выполнить некоторую математику для преобразования этого int в местоположение матрицы.
IntStream.range(0, rows*cols)
.parallel()
.forEach( i->{
int x = i/rows;
int y = i%rows;
res[x][y] = a[x][y] + b[x][y];
});
Другие ответы на этот вопрос не только ошибочны (на момент написания этой статьи), но и создают несколько потоков, которые влияют на производительность, а также даже не параллельны.
Как отмечает @Holger, в то время как этот единственный поток может быть проще читать, затраты на производительность делений и модуля будут делать его медленнее, чем поток потоков с добавками только до тех пор, пока не будет много ядер. Я не уверен, сколько потребуется для компенсации
Ответ 5
Как насчет этого?
double[][] res = IntStream.range(0, a.length).parallel()
.mapToObj(i ->
IntStream.range(0, a[i].length)
.mapToDouble(j -> a[i][j] + b[i][j])
.toArray()
)
.toArray(double[][]::new);
System.out.println(res);
// ^--- [[2., 4.], [6., 8.]]