Как раздел Spark (ing) работает с файлами в HDFS?
Я работаю с Apache Spark в кластере с использованием HDFS. Насколько я понимаю, HDFS распространяет файлы на узлах данных. Поэтому, если поставить файловую систему "file.txt", она будет разделена на разделы.
Теперь я звоню
rdd = SparkContext().textFile("hdfs://.../file.txt")
от Apache Spark.
Имеет ли rdd автоматически те же разделы, что и "file.txt" в файловой системе?
Что происходит, когда я вызываю
rdd.repartition(x)
где x > , то разделы, используемые hdfs? Будет ли Spark физически переупорядочивать данные по hdfs для работы локально?
Пример:
Я поместил 30GB Textfile в HDFS-систему, которая распределяет его на 10 узлах.
Will Spark
а) использовать те же 10 партионов? и б) перетасовать 30 ГБ по кластеру при вызове перераспределения (1000)?
Ответы
Ответ 1
Когда Spark считывает файл из HDFS, он создает один раздел для одного разделения входа. Разделение входа устанавливается с помощью Hadoop InputFormat
, используемого для чтения этого файла. Например, если вы используете textFile()
, то в Hadoop будет TextInputFormat
, который вернет вам один раздел для одного блока HDFS (но разделение между разделами будет выполняться по расщеплению строки, а не по точной разбивке блоков), если у вас нет сжатого текстового файла. В случае с сжатым файлом вы получите один раздел для одного файла (поскольку сжатые текстовые файлы не разделяются).
Когда вы вызываете rdd.repartition(x)
, он будет выполнять перетасовку данных из N
partititons, которые у вас есть в rdd
до x
разделов, которые вы хотите иметь, разбиение будет выполняться с округлой структурой.
Если у вас есть несжатый текстовый файл 30 ГБ, хранящийся на HDFS, тогда с настройкой размера блока HDFS по умолчанию (128 МБ) он будет сохранен в 235 блоках, а это значит, что RDD, который вы прочитали из этого файла, будет иметь 235 разделов. Когда вы вызываете repartition(1000)
, ваш RDD будет отмечен как подлежащий перераспределению, но на самом деле он будет перетасован на 1000 разделов только тогда, когда вы выполните действие поверх этого RDD (ленивая концепция выполнения)
Ответ 2
Вот снимок " Как блоки в HDFS загружаются в рабочие места Spark как разделы"
В этих изображениях 4 блока HDFS загружаются как разделы Spark внутри 3 рабочих памяти
![Dataset in HDFS broken into partitions]()
Пример: я поместил 30GB текстовый файл в HDFS-систему, которая распределяет его на 10 узлах.
Будет ли Spark
a) использовать те же 10 разделов?
Spark загружает те же самые 10 наборов HDFS для рабочей памяти как разделы. Я предполагаю, что размер блока размером 30 ГБ должен быть 3 ГБ, чтобы получить 10 разделов/блоков (со стандартным conf)
b) перетасовать 30 ГБ по кластеру, когда я вызываю перераспределение (1000)?
Да, Spark перемещает данные между рабочими узлами, чтобы создать 1000 разделов в рабочей памяти.
Примечание:
HDFS Block -> Spark partition : One block can represent as One partition (by default)
Spark partition -> Workers : Many/One partitions can present in One workers
Ответ 3
Дополнение к @0x0FFF Если взять HDFS в качестве входного файла, он будет рассчитываться как для этого rdd = SparkContext().textFile("hdfs://.../file.txt")
, а когда вы сделаете rdd.getNumPatitions
, это приведет к Max(2, Number of HDFS block)
. Я провел много экспериментов и нашел это в результате. Снова явным образом вы можете сделать rdd = SparkContext().textFile("hdfs://.../file.txt", 400)
, чтобы получить 400 в качестве разделов или даже сделать переразделы на rdd.repartition
или уменьшить до 10 на rdd.coalesce(10)
Ответ 4
При чтении файлов HDFS без буфера (например, паркета) с помощью spark-sql количество разделов df.rdd.getNumPartitions
зависит от следующих факторов:
-
spark.default.parallelism
(примерно переводит в #core, доступные для приложения) -
spark.sql.files.maxPartitionBytes
(по умолчанию 128 МБ) -
spark.sql.files.openCostInBytes
(по умолчанию 4 МБ)
Примерная оценка количества перегородок:
-
Если у вас достаточно ядер для чтения всех ваших данных параллельно (то есть как минимум одно ядро на каждые 128 МБ ваших данных)
AveragePartitionSize ≈ min(4MB, TotalDataSize/#cores) NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize
-
Если вам не хватает ядер,
AveragePartitionSize ≈ 128MB NumberOfPartitions ≈ TotalDataSize/AveragePartitionSize
Точные вычисления немного сложны и могут быть найдены в базе кода для FileSourceScanExec, см. Здесь.