Spark: увеличить количество разделов, не вызывая перетасовки?
При уменьшении количества разделов можно использовать coalesce
, что отлично, потому что оно не вызывает перетасовку и, кажется, работает мгновенно (не требуется дополнительный этап задания).
Я хотел бы иногда делать обратное, но repartition
вызывает перетасовку. Я думаю, что несколько месяцев назад я действительно получил эту работу, используя CoalescedRDD
с balanceSlack = 1.0
- так что произойдет, так это разделить раздел так, чтобы результирующие разделы были расположены там, где все на одном и том же node (так малое сетевое IO).
Такая функциональность в Hadoop автоматизирована, а просто изменяет размер разделения. Кажется, что он не работает таким образом в Spark, если не уменьшается количество разделов. Я думаю, что решение может состоять в том, чтобы написать пользовательский разделитель вместе с настраиваемым RDD, где мы определяем getPreferredLocations
... но я думал, что это такая простая и распространенная вещь, что, безусловно, должен быть прямой способ сделать это?
Проверенные вещи:
.set("spark.default.parallelism", partitions)
на моем SparkConf
, и когда в контексте чтения паркета я пробовал sqlContext.sql("set spark.sql.shuffle.partitions= ...
, который на 1.0.0 вызывает ошибку И не хочу, чтобы я хотел, я хочу, чтобы номер раздела изменялся все виды работы, а не просто перетасовки.
Ответы
Ответ 1
Смотрите это пространство
https://issues.apache.org/jira/browse/SPARK-5997
Этот вид действительно простой очевидной функции в конечном итоге будет реализован - я думаю, сразу после того, как они завершат все ненужные функции в Dataset
s.
Ответ 2
Я не совсем понимаю, что вы думаете. Вы имеете в виду, что у вас сейчас 5 разделов, но после следующей операции вам нужны данные, распределенные до 10? Поскольку наличие 10, но все еще использование 5 не имеет большого смысла... Процесс отправки данных в новые разделы должен произойти когда-нибудь.
При выполнении coalesce
вы можете избавиться от несанкционированных разделов, например: если вы изначально имели 100, но затем после reduceByKey вы получили 10 (как там, где всего 10 ключей), вы можете установить coalesce
.
Если вы хотите, чтобы процесс прошел другим путем, вы можете просто заставить какое-то разделение:
[RDD].partitionBy(new HashPartitioner(100))
Я не уверен, что вы ищете, но надеюсь на это.