Ответ 1
Ну, давайте сделаем ваш набор данных немного интереснее:
val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
У нас есть шесть элементов:
rdd.count
Long = 6
нет разделителя:
rdd.partitioner
Option[org.apache.spark.Partitioner] = None
и восемь разделов:
rdd.partitions.length
Int = 8
Теперь давайте определим небольшой помощник для подсчета количества элементов в каждом разделе:
import org.apache.spark.rdd.RDD
def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
Поскольку у нас нет разделителя, наш набор данных распределяется равномерно между разделами (Схема разбиения по умолчанию в Spark):
countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
Теперь давайте переделаем наш набор данных:
import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
Поскольку параметр, переданный в HashPartitioner
, определяет число разделов, мы ожидаем один раздел:
rddOneP.partitions.length
Int = 1
Поскольку у нас есть только один раздел, он содержит все элементы:
countByPartition(rddOneP).collect
Array[Int] = Array(6)
Обратите внимание, что порядок значений после тасования не является детерминированным.
То же самое, если мы используем HashPartitioner(2)
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
мы получим 2 раздела:
rddTwoP.partitions.length
Int = 2
Так как rdd
разделяется по ключевым данным, они не будут распределяться равномерно:
countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)
Потому что с тремя ключами и только двумя разными значениями hashCode
mod numPartitions
здесь нет ничего неожиданного:
(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
Только для подтверждения выше:
rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
Наконец, с HashPartitioner(7)
мы получим семь разделов, три непустых с двумя элементами:
val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
Сводка и примечания
-
HashPartitioner
принимает единственный аргумент, который определяет количество разделов Значения -
присваиваются разделам с помощью
hash
ключей. Функцияhash
может отличаться в зависимости от языка (Scala RDD может использоватьhashCode
,DataSets
использовать MurmurHash 3, PySpark,portable_hash
).В простом случае, когда ключ является маленьким целым числом, вы можете предположить, что
hash
является идентификатором (i = hash(i)
).Scala API использует
nonNegativeMod
для определения раздела на основе вычисленного хэша, -
Если распределение ключей не является однородным, вы можете оказаться в ситуациях, когда часть вашего кластера простаивает.
Клавиши -
должны быть хешируемыми. Вы можете проверить мой ответ на Список как ключ для PySpark reduceByKey, чтобы прочитать о проблемах PySpark. Другая возможная проблема выделяется Документация HashPartitioner:
Java-массивы имеют hashCodes, основанные на идентификаторах массивов, а не на их содержимом, поэтому попытка разбиения RDD [Array []] или RDD [(Array [], _)] с использованием HashPartitioner приведет к непредвиденным или неверный результат.
-
В Python 3 вы должны убедиться, что хеширование согласовано. См. Что делает исключение: случайность хеша строки должна быть отключена через значение PYTHONHASHSEED в pyspark?
-
Разделитель хэшей не является ни инъективным, ни сюръективным. Несколько ключей могут быть назначены одному разделу, а некоторые разделы могут оставаться пустыми.
-
Обратите внимание, что в настоящее время хэш-методы не работают в Scala в сочетании с определенными классами классов REPL (Равенство класса Case в Apache Spark).
-
HashPartitioner
(или любой другойPartitioner
) перетасовывает данные. Если секционирование не используется повторно между несколькими операциями, оно не уменьшает количество данных, которые нужно перетасовать.