Поддерживает ли Spark разделение подрезки с помощью паркетных файлов

Я работаю с большим набором данных, который разбивается на два столбца - plant_name и tag_id. Второй раздел - tag_id имеет 200000 уникальных значений, и я в основном получаю доступ к данным по определенным значениям tag_id. Если я использую следующие команды Spark:

sqlContext.setConf("spark.sql.hive.metastorePartitionPruning", "true")
sqlContext.setConf("spark.sql.parquet.filterPushdown", "true")
val df = sqlContext.sql("select * from tag_data where plant_name='PLANT01' and tag_id='1000'")

Я бы ожидал быстрого ответа, так как это разрешает один раздел. В Hive и Presto это занимает несколько секунд, однако Spark работает в течение нескольких часов.

Фактические данные хранятся в ведре S3, и когда я отправляю запрос sql, Spark отключается и сначала получает все разделы из метастара Hive (200000 из них), а затем вызывает refresh(), чтобы принудительно выполнить полный список всех этих файлов в хранилище объектов S3 (фактически вызывающий listLeafFilesInParallel).

Эти две операции настолько дороги, есть ли какие-либо настройки, которые могут заставить Spark обрезать разделы раньше - либо во время вызова в хранилище метаданных, либо сразу после этого?

Ответы

Ответ 1

Да, искра поддерживает разделение разделов.

Spark делает список каталогов разделов (последовательный или параллельный listLeafFilesInParallel) для создания кеша всех разделов в первый раз. Запросы в том же приложении, которые сканируют данные, используют этот кеш. Таким образом, медленность, которую вы видите, может быть связана с этим построением кеша. Последующие запросы, которые сканируют данные, используют кеш для сокращения разделов.

Это журналы, в которых перечислены разделы, которые перечислены для заполнения кеша.

App > 16/11/14 10:45:24 main INFO ParquetRelation: Listing s3://test-bucket/test_parquet_pruning/month=2015-01 on driver
App > 16/11/14 10:45:24 main INFO ParquetRelation: Listing s3://test-bucket/test_parquet_pruning/month=2015-02 on driver
App > 16/11/14 10:45:24 main INFO ParquetRelation: Listing s3://test-bucket/test_parquet_pruning/month=2015-03 on driver

Это журналы, показывающие, что происходит обрезка.

App > 16/11/10 12:29:16 main INFO DataSourceStrategy: Selected 1 partitions out of 20, pruned 95.0% partitions.

Обратитесь convertToParquetRelation и getHiveQlPartitions в HiveMetastoreCatalog.scala.

Ответ 2

Просто мысль:

Документация API Spark для HadoopFsRelation гласит: (https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/sources/HadoopFsRelation.html)

"... при чтении из таблиц стилей стиля Hive, хранящихся в файле системы, он способен обнаруживать информацию о секционировании с путей каталогов ввода и выполнить разделение разделов перед запуском чтение данных..."

Итак, я думаю, что "listLeafFilesInParallel" не может быть проблемой.

Аналогичная проблема уже находится в искрах jira: https://issues.apache.org/jira/browse/SPARK-10673

Несмотря на то, что параметр spark.sql.hive.verifyPartitionPath установлен в false, и нет эффекта в производительности, я подозреваю, что проблема может быть вызвана незарегистрированными разделами. Просьба перечислить разделы таблицы и проверить, все ли разделы зарегистрированы. Else, восстановите свои разделы, как показано в этой ссылке:

Куст не читает файлы секционированного паркета, созданные Spark

Update:

  • Я предполагаю, что при записи данных был установлен соответствующий размер паркета и размер страницы.

  • Создайте новую таблицу улей с указанными разделами и файл-формат в виде паркета, загрузите его из несегментированной таблицы, используя динамический подход к разделению. (https://cwiki.apache.org/confluence/display/Hive/DynamicPartitions) Запустите простой запрос на улей, а затем сравните, запустив искровую программу.

Отказ от ответственности: я не специалист по искрам/паркету. Проблема звучала интересно и, следовательно, ответила.