Чрезвычайно медленное время записи S3 из EMR/Spark

Я пишу, чтобы узнать, знает ли кто, как ускорить S3 время записи от Spark, работающего в EMR?

Моя работа Spark занимает более 4 часов, однако кластер находится под нагрузкой только в течение первых 1,5 часов.

enter image description here

Мне было любопытно, что Спарк делал все это время. Я просмотрел журналы и нашел много команд s3 mv, по одному для каждого файла. Затем, взглянув прямо на S3, я вижу, что все мои файлы находятся в каталоге _temporary.

Вторично, меня беспокоит стоимость кластера, похоже, мне нужно купить 2 часа для вычисления этой конкретной задачи. Тем не менее, я в конечном итоге покупаю до 5 часов. Мне любопытно, может ли EMR AutoScaling помочь в стоимости в этой ситуации.

В некоторых статьях обсуждается изменение алгоритма компилятора вывода файлов, но с этим я мало успел.

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

Запись на локальную HDFS выполняется быстро. Мне любопытно, будет ли выдача команды hadoop для копирования данных на S3 быстрее?

enter image description here

Ответы

Ответ 1

То, что вы видите, это проблема с outputcommitter и s3. задание фиксации применяет fs.rename в папке _temporary, и поскольку S3 не поддерживает переименование, это означает, что один запрос теперь копирует и удаляет все файлы из _temporary в конечный пункт назначения.

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2") работает только с версией hadoop> 2.7. то, что он делает, заключается в том, чтобы скопировать каждый файл из _temporary в задачу фиксации и не выполнять задание, чтобы оно было распределено и работает довольно быстро.

Если вы используете более старую версию hadoop, я бы использовал Spark 1.6 и использовал:

sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")

* Обратите внимание, что он не работает с включенным режимом или написанием в режиме добавления

** также обратите внимание, что он устарел в Spark 2.0 (заменен на algorithm.version = 2)

BTW в моей команде мы на самом деле пишем Spark для HDFS и используем задания DISTCP (в частности s3-dist-cp) в производстве для копирования файлов на S3, но это делается по нескольким другим причинам (согласованность, отказоустойчивость), поэтому нет необходимости.. вы можете написать на S3 довольно быстро, используя то, что я предложил.

Ответ 2

Прямой коммиттер был вытащен от искры, поскольку он не был устойчив к сбоям. Я бы настоятельно советовал использовать его.

В Hadoop, s3guard есть работа, чтобы добавить коммиттеров 0-rename, которые будут O (1) и отказоустойчивы; следите за HADOOP-13786.

Игнорируя "волшебный коммиттер" на данный момент, начальный коммиттер на основе Netflix отправится первым (hadoop 2.9? 3.0?)

  1. Это записывает работу в локальную FS, в задачу commit
  2. выдает незафиксированные операции множественного ввода для записи данных, но не материализует их.
  3. сохраняет информацию, необходимую для фиксации PUT для HDFS, используя исходный компилятор вывода "алгоритм 1"
  4. Реализует фиксацию задания, которая использует фиксацию вывода файла HDFS для определения того, какие PUT должны завершиться, и которые нужно отменить.

Результат: фиксация задачи принимает данные/полосы пропускания в секундах, но фиксация задания занимает больше времени, чем время 1-4 GET в папке назначения и POST для каждого ожидающего файла, причем последний распараллеливается.

Вы можете подобрать коммиттера, на котором основана эта работа, от netflix и, вероятно, использовать его в иске сегодня. Устанавливайте алгоритм фиксации файла = 1 (должен быть по умолчанию) или он фактически не записывает данные.

Ответ 3

У меня был аналогичный случай использования, когда я использовал искру для записи в s3 и имел проблемы с производительностью. Основная причина заключалась в том, что искра создавала много файлов с нулевыми байтами, и замена файлов temp на фактическое имя файла замедляла процесс записи. Пытался подходить под работу как работа

  1. Запись вывода искры в HDFS и использование Hive для записи на s3. Производительность была намного лучше, поскольку улей создавал меньше файлов деталей. Проблема у меня была (также была такая же проблема при использовании искры), исключить действие в отношении политики не было в prod env из соображений безопасности. Ведро S3 было зашифровано в моем случае.

  2. Запись искрового выхода в HDFS и скопированные файлы hdfs в локальную и использованную aws s3 копию для передачи данных на s3. Получил второй лучший результат при таком подходе. Создал билет с Amazon, и они предложили пойти с этим.

  3. Используйте s3 dist cp для копирования файлов с HDFS на S3. Это работало без проблем, но не было

Ответ 4

Что вы видите в искровом исходе? Если вы видите много операций переименования, прочитайте это

Ответ 5

Мы испытали то же самое на Azure, используя Spark на WASB. Наконец, мы решили не использовать ограниченное хранилище непосредственно с искру. Мы сделали spark.write для реального hdfs://назначения и разработали специальный инструмент, который делает: hasoop copyFromLocal hdfs://wasb://HDFS - это наш временный буфер до архивации до WASB (или S3).

Ответ 6

Насколько велик файл, который вы пишете? Наличие одного основного письма в очень большом файле будет намного медленнее, чем разделение файла и многократные работники выписывают меньшие файлы.