Объяснение агрегатной функции scala
Я еще не понимаю, что такое агрегатная функция:
Например, имея:
val x = List(1,2,3,4,5,6)
val y = x.par.aggregate((0, 0))((x, y) => (x._1 + y, x._2 + 1), (x,y) => (x._1 + y._1, x._2 + y._2))
Результат будет: (21,6)
Ну, я думаю, что (x,y) => (x._1 + y._1, x._2 + y._2)
должен получить результат параллельно, например, он будет (1 + 2, 1 + 1) и т.д.
Но именно эта часть меня путает:
(x, y) => (x._1 + y, x._2 + 1)
почему x._1 + y
? и здесь x._2
есть 0
?
Спасибо заранее.
Ответы
Ответ 1
В документации :
def aggregate[B](z: ⇒ B)(seqop: (B, A) ⇒ B, combop: (B, B) ⇒ B): B
Сопоставляет результаты применения оператора к последующим элементам.
Это более общая форма сгиба и сокращение. Аналогично семантики, но не требует, чтобы результат был супертипом тип элемента. Он перемещает элементы в разных разделах последовательно, используя seqop для обновления результата, а затем применяется combop к результатам из разных разделов. Реализация эта операция может работать на произвольном количестве сборов разделов, поэтому combop может вызываться произвольным числом раз.
Например, можно обрабатывать некоторые элементы, а затем производить множество. В этом случае seqop обработает элемент и добавит его в список, в то время как combop объединяет два списка из разных разделов. Начальное значение z будет пустым.
pc.aggregate(Set[Int]())(_ += process(_), _ ++ _)
Другой пример: вычисление геометрического среднего из набора двойников (один обычно для этого требуются большие удваивания). B тип накопленного результаты z начальное значение для накопленного результата раздел - это, как правило, нейтральный элемент для seqop оператора (например, Nil для конкатенации списка или 0 для суммирования) и может оцениваться более одного раза подряд оператором, используемым для накопления результаты в группе разделяют ассоциативный оператор, используемый для объединить результаты из разных разделов
В вашем примере B
есть Tuple2[Int, Int]
. Метод seqop
затем берет из списка один элемент из списка, в виде y
и обновляет агрегат B
до (x._1 + y, x._2 + 1)
. Таким образом, он увеличивает второй элемент в кортеже. Это эффективно помещает сумму элементов в первый элемент кортежа и количество элементов во второй элемент кортежа.
Метод combop
затем берет результаты из каждого потока параллельного выполнения и объединяет их. Комбинация с помощью добавления дает те же результаты, что и в том случае, если они выполнялись в списке последовательно.
Использование B
в качестве кортежа, вероятно, является путаной частью этого. Вы можете разбить проблему на две проблемы, чтобы лучше понять, что это делает. res0
- это первый элемент кортежа результата, а res1
- второй элемент в корте. результатов.
// Sums all elements in parallel.
scala> x.par.aggregate(0)((x, y) => x + y, (x, y) => x + y)
res0: Int = 21
// Counts all elements in parallel.
scala> x.par.aggregate(0)((x, y) => x + 1, (x, y) => x + y)
res1: Int = 6
Ответ 2
Прежде всего, благодаря ответу Диего, который помог мне связать точки в понимании функции aggregate().
Позвольте мне признаться, что я не мог спать прошлой ночью должным образом, потому что я не мог понять, как aggregate() работает внутри, я сегодня хорошо поспаю: -)
Давайте начнем понимать его
val result = List(1,2,3,4,5,6,7,8,9,10).par.aggregate((0, 0))
(
(x, y) => (x._1 + y, x._2 + 1),
(x,y) =>(x._1 + y._1, x._2 + y._2)
)
результат: (Int, Int) = (55,10)
агрегированная функция имеет 3 части:
- начальное значение аккумуляторов: кортеж (0,0) здесь
- seqop: он работает как foldLeft с начальным значением 0
- combop: он объединяет результат, созданный посредством распараллеливания (эта часть мне трудно понять)
Познайте все 3 части независимо:
part-1: Начальный набор (0,0)
Агрегат() начинается с начального значения аккумуляторов x, которое здесь (0,0). Для вычисления суммы используется первый кортеж x._1, который изначально равен 0. Второй набор x._2 используется для вычисления общего количества элементов в списке.
part-2: (x, y) = > (x._1 + y, x._2 + 1)
Если вы знаете, как foldLeft работает в scala, тогда это должно быть легко понять эту часть. Выше функция работает так же, как foldLeft в нашем списке (1,2,3,4... 10).
Iteration# (x._1 + y, x._2 + 1)
1 (0+1, 0+1)
2 (1+2, 1+1)
3 (3+3, 2+1)
4 (6+4, 3+1)
. ....
. ....
10 (45+10, 9+1)
Таким образом, после всех 10 итераций вы получите результат (55,10).
Если вы понимаете эту часть, остальное очень легко, но для меня это была самая трудная часть в понимании того, что все необходимые вычисления закончены, то что использует вторая часть, то есть compop - следите за обновлениями: -)
часть 3: (x, y) = > (x._1 + y._1, x._2 + y._2)
Ну, эта третья часть - combOp, которая объединяет результат, сгенерированный разными потоками во время распараллеливания, помните, что мы использовали 'par' в нашем коде, чтобы включить параллельное вычисление списка:
Список (1,2,3,4,5,6,7,8,9,10).par.aggregate(....)
Исход Apache эффективно использует агрегатную функцию для параллельного вычисления RDD.
Предположим, что наш список (1,2,3,4,5,6,7,8,9,10) вычисляется тремя потоками параллельно. Здесь каждый поток работает с неполным списком, а затем наш combate() combOp будет комбинировать результат вычисления каждого потока с помощью приведенного ниже кода:
(x,y) =>(x._1 + y._1, x._2 + y._2)
Исходный список: Список (1,2,3,4,5,6,7,8,9,10)
Thread1 начинает вычисление в частичном списке say (1,2,3,4), Thread2 вычисляет (5,6,7,8) и Thread3 вычисляет частичный список say (9,10)
В конце вычисления результат Thread-1 будет равен (10,4), результат Thread-2 будет (26,4), а результат Thread-3 будет (19,2).
В конце параллельного вычисления мы будем иметь ((10,4), (26,4), (19,2))
Iteration# (x._1 + y._1, x._2 + y._2)
1 (0+10, 0+4)
2 (10+26, 4+4)
3 (36+19, 8+2)
который равен (55,10).
Наконец, позвольте мне повторить итерацию этого задания seqOp, чтобы вычислить сумму всех элементов списка и общее количество списков, тогда как коммандовое задание должно сочетать разные частичные результаты, сгенерированные во время распараллеливания.
Надеюсь, что приведенное выше объяснение поможет вам понять агрегат().
Ответ 3
aggregate принимает 3 параметра: начальное значение, функцию вычисления и комбинационную функцию.
То, что он делает, в основном разбивает коллекцию на несколько потоков, вычисляет частичные результаты, используя функцию вычисления, а затем объединяет все эти частичные результаты с помощью комбинированной функции.
Из того, что я могу сказать, ваша примерная функция вернет пару (a, b), где a - сумма значений в списке, b - количество значений в списке. Действительно, (21, 6).
Как это работает? Начальное значение - пара (0,0). Для пустого списка мы имеем сумму 0 и число элементов 0, поэтому это правильно.
Ваша вычислительная функция принимает пару (Int, Int) x, которая является вашим частичным результатом, и Int y, которая является следующим значением в списке. Это ваш:
(x, y) => (x._1 + y, x._2 + 1)
В самом деле, результат, который мы хотим, состоит в том, чтобы увеличить левый элемент x (аккумулятора) на y и правый элемент x (счетчик) на 1 для каждого y.
Ваша комбинированная функция принимает пару (Int, Int) x и пару (Int, Int) y, которые являются вашими двумя частичными результатами из разных параллельных вычислений, и объединяет их как:
(x,y) => (x._1 + y._1, x._2 + y._2)
Действительно, мы суммируем независимо левые части пар и правые части пар.
Ваше замешательство возникает из-за того, что x и y в первой функции НЕ являются теми же x и y второй функции. В первой функции у вас есть x типа начального значения и y типа элементов коллекции, и вы возвращаете результат типа x. Во второй функции ваши два параметра имеют одинаковый тип исходного значения.
Надеюсь, теперь это станет яснее!