Объясните совокупную функциональность в Spark
Я ищу более подробное объяснение совокупной функциональности, доступной через искру в python.
Пример, который у меня есть, следующий (используя pyspark из версии Spark 1.2.0)
sc.parallelize([1,2,3,4]).aggregate(
(0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
Вывод:
(10, 4)
Я получаю ожидаемый результат (10,4)
, который представляет собой сумму 1+2+3+4
и 4 элемента. Если я изменил начальное значение, переданное агрегированной функции на (1,0)
из (0,0)
, я получаю следующий результат
sc.parallelize([1,2,3,4]).aggregate(
(1, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
Вывод:
(19, 4)
Значение увеличивается на 9. Если я изменю его на (2,0)
, значение будет (28,4)
и т.д.
Может кто-нибудь объяснить мне, как рассчитывается это значение? Я ожидал, что значение увеличится на 1, а не на 9, ожидая увидеть (11,4)
вместо этого, я вижу (19,4)
.
Ответы
Ответ 1
У меня недостаточно очков репутации, чтобы прокомментировать предыдущий ответ Маасга. На самом деле нулевое значение должно быть "нейтральным" по отношению к seqop, что означает, что оно не будет мешать результату seqop, например, 0 к сложению или 1 к *;
Никогда не пытайтесь использовать ненейтральные значения, так как они могут применяться произвольное время. Это поведение связано не только с числом разделов.
Я попробовал тот же эксперимент, как указано в вопросе. с 1 разделом нулевое значение применялось 3 раза. с 2 перегородками, 6 раз. с 3 разделами, 9 раз, и это будет продолжаться.
Ответ 2
Я не был полностью убежден в принятом ответе, и ответ JohnKnight помог, поэтому здесь моя точка зрения:
Прежде всего, позвольте объяснить aggregate() моими собственными словами:
Прототип:
aggregate(zeroValue, seqOp, combOp)
Описание:
aggregate()
позволяет вам взять СДР и сгенерировать одно значение, отличающееся от того, которое было сохранено в исходном СДР.
Параметры:
zeroValue
: значение инициализации, для вашего результата, в желаемом
формат.
seqOp
: операция, которую вы хотите применить к записям RDD. Работает один раз для
каждая запись в разделе.
combOp
: определяет, как получаются объекты (по одному на каждый раздел),
объединяется.
Пример:
Вычислить сумму списка и длину этого списка. Вернуть результат в паре (sum, length)
.
В оболочке Spark я сначала создал список из 4 элементов с 2 разделами:
listRDD = sc.parallelize([1,2,3,4], 2)
Затем я определил свой seqOp:
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
и мой combOp:
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
а потом я агрегировал:
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
Как видите, я дал описательные имена своим переменным, но позвольте мне объяснить это подробнее:
Первый раздел имеет подсписок [1, 2]. Мы будем применять seqOp к каждому элементу этого списка, и это приведет к локальному результату, паре (sum, length)
, который будет отражать результат локально, только в этом первом разделе.
Итак, начнем: local_result
инициализируется параметром zeroValue
, которому мы предоставили aggregate()
, т.е. (0, 0) и list_element
- первый элемент списка, т.е. 1. В результате это что происходит:
0 + 1 = 1
0 + 1 = 1
Теперь локальный результат равен (1, 1), это означает, что до сих пор для 1-го раздела после обработки только первого элемента сумма равна 1 и длине 1. Обратите внимание, что local_result
получает обновлено с (0, 0) до (1, 1).
1 + 2 = 3
1 + 1 = 2
и теперь локальный результат - (3, 2), который будет окончательным результатом 1-го раздела, поскольку они не являются другими элементами в подсписке 1-го раздела.
Проделав то же самое для второго раздела, мы получим (7, 2).
Теперь мы применяем combOp к каждому локальному результату, чтобы мы могли сформировать окончательный глобальный результат, например так: (3,2) + (7,2) = (10, 4)
Пример описан в 'figure':
(0, 0) <-- zeroValue
[1, 2] [3, 4]
0 + 1 = 1 0 + 3 = 3
0 + 1 = 1 0 + 1 = 1
1 + 2 = 3 3 + 4 = 7
1 + 1 = 2 1 + 1 = 2
| |
v v
(3, 2) (7, 2)
\ /
\ /
\ /
\ /
\ /
\ /
------------
| combOp |
------------
|
v
(10, 4)
Вдохновлен этим великолепным примером.
Так что теперь, если zeroValue
не (0, 0), а (1, 0), можно ожидать, что (8 + 4, 2 + 2) = (12, 4), что не объясняет, что вы испытываете, Даже если мы изменим количество разделов в моем примере, я не смогу получить это снова.
Ключевым моментом здесь является ответ JohnKnight, в котором говорится, что zeroValue
не только аналогичен количеству разделов, но может применяться больше раз, чем вы ожидаете.
Ответ 3
Агрегат позволяет преобразовывать и комбинировать значения RDD по желанию.
Он использует две функции:
Первый преобразует и добавляет элементы исходной коллекции [T] в локальную совокупность [U] и принимает вид: (U, T) = > U. Вы можете видеть это как складку, и поэтому она также для этой операции требуется ноль. Эта операция применяется локально к каждому разделу параллельно.
Вот где ключ заключается в следующем: единственное значение, которое следует использовать здесь, - это значение ZERO для операции сокращения.
Эта операция выполняется локально на каждом разделе, поэтому добавление чего-либо к этому нулевому значению добавит к результату, умноженному на количество разделов RDD.
Вторая операция принимает 2 значения типа результата предыдущей операции [U] и объединяет ее в одно значение. Эта операция уменьшит частичные результаты каждого раздела и приведет к фактической сумме.
Например:
Учитывая RDD строк:
val rdd:RDD[String] = ???
Предположим, вы хотите создать совокупность длины строк в этом RDD, чтобы вы сделали:
1) Первая операция преобразует строки в размер (int) и накапливает значения для размера.
val stringSizeCummulator: (Int, String) => Int = (total, string) => total + string.lenght`
2) обеспечивают ZERO для операции добавления (0)
val ZERO = 0
3) операцию для добавления двух целых чисел:
val add: (Int, Int) => Int = _ + _
Объединяя все это:
rdd.aggregate(ZERO, stringSizeCummulator, add)
Итак, зачем нужен ZERO?
Когда функция куммулятора применяется к первому элементу раздела, нет текущей суммы. Здесь используется ZERO.
Eg. Мой RDD:
- Раздел 1: [ "Перейти", "над" ]
- Раздел 2: [ "the", "wall" ]
Это приведет к:
Р1:
- stringSizeCummulator (ZERO, "Jump" ) = 4
- stringSizeCummulator (4, "over" ) = 8
P2:
- stringSizeCummulator (ZERO, "the" ) = 3
- stringSizeCummulator (3, "wall" ) = 7
Уменьшить: добавить (P1, P2) = 15
Ответ 4
Вы можете использовать следующий код (в scala), чтобы точно увидеть, что делает aggregate
. Он создает дерево всех операций сложения и слияния:
sealed trait Tree[+A]
case class Leaf[A](value: A) extends Tree[A]
case class Branch[A](left: Tree[A], right: Tree[A]) extends Tree[A]
val zero : Tree[Int] = Leaf(0)
val rdd = sc.parallelize(1 to 4).repartition(3)
И затем в оболочке:
scala> rdd.glom().collect()
res5: Array[Array[Int]] = Array(Array(4), Array(1, 2), Array(3))
Итак, мы имеем эти 3 раздела: [4], [1,2] и [3].
scala> rdd.aggregate(zero)((l,r)=>Branch(l, Leaf(r)), (l,r)=>Branch(l,r))
res11: Tree[Int] = Branch(Branch(Branch(Leaf(0),Branch(Leaf(0),Leaf(4))),Branch(Leaf(0),Leaf(3))),Branch(Branch(Leaf(0),Leaf(1)),Leaf(2)))
Вы можете представить результат как дерево:
+
| \__________________
+ +
| \________ | \
+ + + 2
| \ | \ | \
0 + 0 3 0 1
| \
0 4
Вы можете видеть, что первый нулевой элемент создается в драйвере node (слева от дерева), а затем результаты для всех разделов объединяются один за другим. Вы также видите, что если вы замените 0 на 1, как и в своем вопросе, он добавит по 1 к каждому результату в каждом разделе, а также добавит 1 к исходному значению в драйвере. Таким образом, общее количество времени, которое вы даете нулю, используется:
number of partitions + 1
.
Итак, в вашем случае результат
aggregate(
(X, Y),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
будет:
(sum(elements) + (num_partitions + 1)*X, count(elements) + (num_partitions + 1)*Y)
Реализация aggregate
довольно проста. Он определен в RDD.scala, строка 1107:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
}
Ответ 5
Великие объяснения, это действительно помогло мне понять нижнюю работу агрегатной функции. Я играл с ним некоторое время и узнал, как показано ниже.
-
если вы используете acc as (0,0), то он не изменит результат выхода из функции.
-
если исходный аккумулятор изменен, тогда он обработает результат, как показано ниже
[сумма элементов RDD + начальное значение * Количество разделов RDD + acc начальное значение]
для вопроса здесь, я бы предложил проверить разделы, так как количество разделов должно быть 8 согласно моему пониманию, так как каждый раз, когда мы обрабатываем seq op в разделе RDD, он начинается с начальной суммы результата acc а также когда он будет делать гребень Op, он снова будет использовать начальное значение acc.
например. Список (1,2,3,4) и acc (1,0)
Получить разделы в scala с помощью RDD.partitions.size
если разделы являются 2, а число элементов равно 4, то = > [10 + 1 * 2 + 1] = > (13,4)
если раздел равен 4, а число элементов равно 4, тогда = > [10 + 1 * 4 + 1] = > (15,4)
Надеюсь, что это поможет, вы можете проверить здесь для объяснения. Спасибо.
Ответ 6
Для людей, которые ищут Scala Эквивалентный код для приведенного выше примера - вот оно. Та же логика, тот же ввод/результат.
scala> val listRDD = sc.parallelize(List(1,2,3,4), 2)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:21
scala> listRDD.collect()
res7: Array[Int] = Array(1, 2, 3, 4)
scala> listRDD.aggregate((0,0))((acc, value) => (acc._1+value,acc._2+1),(acc1,acc2) => (acc1._1+acc2._1,acc1._2+acc2._2))
res10: (Int, Int) = (10,4)
Ответ 7
Я пробую много экспериментов по этому вопросу. Лучше установить множество разделов для агрегата. seqOp обработает каждый партион и применит начальное значение, что еще больше, combOp также применит начальное значение при объединении всех разделов.
Итак, я представляю формат для этого вопроса:
final result = sum(list) + num_Of_Partitions * initial_Value + 1
Ответ 8
Благодаря гсамарас.
Мой обзор, как показано ниже, ![enter image description here]()
Ответ 9
Я объясню концепцию Агрегатной операции в Spark следующим образом:
Определение агрегатной функции
**def aggregate** (initial value)(an intra-partition sequence operation)(an inter-partition combination operation)
val flowers = sc.parallelize(List(11, 12, 13, 24, 25, 26, 35, 36, 37, 24, 25, 16), 4)
→ 4 представляет количество разделов, доступных в нашем кластере Spark.
Следовательно, rdd распределяется на 4 раздела как:
11, 12, 13
24, 25, 26
35, 36, 37
24, 25, 16
мы делим постановку задачи на две части:
Первая часть проблемы заключается в агрегировании общего количества цветов, собранных в каждом квадранте; что агрегация последовательности внутри раздела
11+12+13 = 36
24+25+26 = 75
35+36+37 = 108
24+25 +16 = 65
Вторая часть проблемы заключается в суммировании этих отдельных агрегатов по разделам; что агрегация между разделами.
36 + 75 + 108 + 65 = 284
Сумма, хранящаяся в СДР, может в дальнейшем использоваться и обрабатываться для любого вида преобразования или другого действия
Таким образом, код становится как:
val sum = flowers.aggregate(0)((acc, value) => (acc + value), (x,y) => (x+y))
или
val sum = flowers.aggregate(0)(_+_, _+_)
Answer: 284
Пояснение: (0) - это аккумулятор
Первый + - сумма внутри раздела, добавляющая общее количество цветов, собранных каждым сборщиком в каждом квадранте сада.
Второй + - это сумма между разбиениями, которая объединяет итоговые суммы по каждому квадранту.
Случай 1:
Предположим, если нам нужно уменьшить функции после начального значения. Что произойдет, если начальное значение не будет равно нулю? Если бы это было 4, например:
Число будет добавлено к каждому внутрираздельному агрегату, а также к межраздельному агрегату:
Итак, первый расчет будет следующим:
11+12+13 = 36 + 5 = 41
24+25+26 = 75 + 5 = 80
35+36+37 = 108 + 5 = 113
24+25 +16 = 65 + 5 = 70
Здесь вычисление агрегации между разделами с начальным значением 5:
partition1 + partition2 + partition3+ partition4 + 5 = 41 + 80 + 113 + 70 = 309
Итак, перейдем к вашему запросу: Сумма может быть рассчитана на основе количества разделов, на которые распределяются данные rdd. Я думал, что ваши данные распространяются, как показано ниже, и поэтому у вас есть результат как (19, 4). Таким образом, при выполнении агрегатной операции необходимо указывать номер значения раздела:
val list = sc.parallelize(List(1,2,3,4))
val list2 = list.glom().collect
val res12 = list.aggregate((1,0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
результат:
list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at command-472682101230301:1
list2: Array[Array[Int]] = Array(Array(), Array(1), Array(), Array(2), Array(), Array(3), Array(), Array(4))
res12: (Int, Int) = (19,4)
Объяснение: Поскольку ваши данные распределены по 8 разделам, результат будет похож (с использованием вышеописанной логики)
добавление внутри раздела:
0+1=1
1+1=2
0+1=1
2+1=3
0+1=1
3+1=4
0+1=1
4+1=5
total=18
расчет между разделами:
18+1 (1+2+1+3+1+4+1+5+1) = 19
спасибо