Почему Spark saveAsTable с bucketBy создает тысячи файлов?
Контекст
Spark 2.0.1, spark-submit в режиме кластера. Я читаю паркетный файл из hdfs:
val spark = SparkSession.builder
.appName("myApp")
.config("hive.metastore.uris", "thrift://XXX.XXX.net:9083")
.config("spark.sql.sources.bucketing.enabled", true)
.enableHiveSupport()
.getOrCreate()
val df = spark.read
.format("parquet")
.load("hdfs://XXX.XX.X.XX/myParquetFile")
Я сохраняю df
в таблице кустов с 50 сегментами, сгруппированными по userid
:
df0.write
.bucketBy(50, "userid")
.saveAsTable("myHiveTable")
Теперь, когда я смотрю на склад ульев в моих hdfs /user/hive/warehouse
, появляется папка с именем myHiveTable
. Внутри него находится куча файлов part-*.parquet
. Я ожидаю, что там будет 50 файлов. Но нет, есть 3201 файлов !!!! Есть 64 файла на раздел, почему? Существует разное количество файлов на разделы для разных файлов, которые я сохранил как таблицу кустов. Все файлы очень маленькие, всего десятки килобайт каждый!
Позвольте мне добавить, что количество различных userid
составляет около 1 000 000
в myParquetFile
.
Вопрос
Почему в папке 3201 файла, а не 50! Кто они такие?
Когда я читаю эту таблицу обратно в DataFrame и печатаю количество разделов:
val df2 = spark.sql("SELECT * FROM myHiveTable")
println(df2.rdd.getNumPartitions)
Количество разделов - это правильно 50, и я подтвердил, что данные правильно разделены userid
.
Для одного из моих больших наборов данных 3Tb я создаю таблицу с 1000 разделами, которая создала буквально ~ миллион файлов! Который превышает ограничение элемента каталога 1048576 и дает org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException
Вопрос
От чего зависит количество создаваемых файлов?
Вопрос
Есть ли способ ограничить количество создаваемых файлов?
Вопрос
Должен ли я беспокоиться об этих файлах? Влияет ли это на производительность df2
, имея все эти файлы? Всегда говорят, что мы не должны создавать слишком много разделов, потому что это проблематично.
Вопрос
Я нашел эту информацию Советы по динамическому разделению HIVE, что число файлов может быть связано с количеством картографов. Рекомендуется использовать distribute by
при вставке в таблицу улья. Как я мог сделать это в Spark?
Вопрос
Если проблема действительно такая, как в приведенной выше ссылке, здесь Как управлять номерами файлов таблицы кустов после вставки данных в MapR-FS, они предлагают использовать такие опции, как hive.merge.mapfiles
или hive.merge.mapredfiles
, чтобы объединить все маленькие файлы после карты уменьшают работу. Есть ли варианты для этого в Spark?
Ответы
Ответ 1
Пожалуйста, используйте spark sql, который будет использовать HiveContext для записи данных в таблицу Hive, поэтому он будет использовать количество сегментов, настроенных вами в схеме таблицы.
SparkSession.builder().
config("hive.exec.dynamic.partition", "true").
config("hive.exec.dynamic.partition.mode", "nonstrict").
config("hive.execution.engine","tez").
config("hive.exec.max.dynamic.partitions","400").
config("hive.exec.max.dynamic.partitions.pernode","400").
config("hive.enforce.bucketing","true").
config("optimize.sort.dynamic.partitionining","true").
config("hive.vectorized.execution.enabled","true").
config("hive.enforce.sorting","true").
enableHiveSupport().getOrCreate()
spark.sql(s"insert into hiveTableName partition (partition_column) select * from myParquetFile")
Реализация искры в сегментации не учитывает указанное количество блоков. Каждый раздел записывает в отдельные файлы, поэтому у вас много файлов для каждого сегмента.
Пожалуйста, обратитесь по этой ссылке https://www.slideshare.net/databricks/hive-bucketing-in-apache-spark-with-tejas-patil
Надеюсь, это поможет.
Ravi
Ответ 2
Мне удалось найти обходное решение (на Spark 2.1). Он решает проблему с количеством файлов, но может иметь некоторые последствия для производительности.
dataframe
.withColumn("bucket", pmod(hash($"bucketColumn"), lit(numBuckets)))
.repartition(numBuckets, $"bucket")
.write
.format(fmt)
.bucketBy(numBuckets, "bucketColumn")
.sortBy("bucketColumn")
.option("path", "/path/to/your/table")
.saveAsTable("table_name")
Я думаю, что алгоритм искрового bucketing делает положительный мод MurmurHash3 значения столбца ковша. Это просто реплицирует эту логику и перераспределяет данные, чтобы каждый раздел содержал все данные для ведра.
Вы можете сделать то же самое с разделением + bucketing.
dataframe
.withColumn("bucket", pmod(hash($"bucketColumn"), lit(numBuckets)))
.repartition(numBuckets, $"partitionColumn", $"bucket")
.write
.format(fmt)
.partitionBy("partitionColumn")
.bucketBy(numBuckets, "bucketColumn")
.sortBy("bucketColumn")
.option("path", "/path/to/your/table")
.saveAsTable("table_name")
Протестировано с 3 разделами и 5 кодами локально с использованием формата csv (оба столбца разделов и столбцов - это просто цифры):
$ tree .
.
├── _SUCCESS
├── partitionColumn=0
│ ├── bucket=0
│ │ └── part-00004-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00000.csv
│ ├── bucket=1
│ │ └── part-00003-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00001.csv
│ ├── bucket=2
│ │ └── part-00002-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00002.csv
│ ├── bucket=3
│ │ └── part-00004-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00003.csv
│ └── bucket=4
│ └── part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00004.csv
├── partitionColumn=1
│ ├── bucket=0
│ │ └── part-00002-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00000.csv
│ ├── bucket=1
│ │ └── part-00004-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00001.csv
│ ├── bucket=2
│ │ └── part-00002-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00002.csv
│ ├── bucket=3
│ │ └── part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00003.csv
│ └── bucket=4
│ └── part-00003-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00004.csv
└── partitionColumn=2
├── bucket=0
│ └── part-00000-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00000.csv
├── bucket=1
│ └── part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00001.csv
├── bucket=2
│ └── part-00001-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00002.csv
├── bucket=3
│ └── part-00003-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00003.csv
└── bucket=4
└── part-00000-c2f2b7b5-40a1-4d24-8c05-084b7a05e399_00004.csv
Здесь bucket = 0 для всех 3 разделов (вы можете видеть, что они все одинаковые значения):
$ paste partitionColumn=0/bucket=0/part-00004-5f860e5c-f2c2-4d52-8035-aa00e4432770_00000.csv partitionColumn=1/bucket=0/part-00002-5f860e5c-f2c2-4d52-8035-aa00e4432770_00000.csv partitionColumn=2/bucket=0/part-00000-5f860e5c-f2c2-4d52-8035-aa00e4432770_00000.csv | head
0 0 0
4 4 4
6 6 6
16 16 16
18 18 18
20 20 20
26 26 26
27 27 27
29 29 29
32 32 32
Мне действительно понравился дополнительный индекс ковша. Но если вы этого не сделаете, вы можете сбросить столбец bucket прямо перед записью, и вы получите numBuckets количество файлов на раздел.