FetchFailedException или MetadataFetchFailedException при обработке большого набора данных

Когда я запускаю код синтаксического анализа с 1-гигабайтным набором данных, он завершается без какой-либо ошибки. Но, когда я пытаюсь получить 25 Гбайт данных за один раз, я получаю ниже ошибок. Я пытаюсь понять, как я могу избежать ошибок ниже. Приятно слышать любые предложения или идеи.

Ошибки с ошибкой,

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0

org.apache.spark.shuffle.FetchFailedException: Failed to connect to ip-xxxxxxxx

org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/mnt/yarn/nm/usercache/xxxx/appcache/application_1450751731124_8446/blockmgr-8a7b17b8-f4c3-45e7-aea8-8b0a7481be55/08/shuffle_0_224_0.data, offset=12329181, length=2104094}

Детали кластера:

Пряжа: 8 узлов
Всего ядер: 64
Память: 500 ГБ
Искры версии: 1.5

Операция отправки Spark:

spark-submit --master yarn-cluster \
                        --conf spark.dynamicAllocation.enabled=true \
                        --conf spark.shuffle.service.enabled=true \
                        --executor-memory 4g \
                        --driver-memory 16g \
                        --num-executors 50 \
                        --deploy-mode cluster \
                        --executor-cores 1 \
                        --class my.parser \
                        myparser.jar \
                        -input xxx \
                        -output xxxx \

Одна из трассировки стека:

at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Ответы

Ответ 1

Эта ошибка почти гарантированно вызвана проблемами памяти ваших исполнителей. Я могу придумать пару способов решения этих проблем.

1) Вы можете попробовать запустить с большим количеством разделов (сделайте repartition на dataframe). Проблемы с памятью обычно возникают, когда один или несколько разделов содержат больше данных, чем вписываются в память.

2) Я замечаю, что вы явно не установили spark.yarn.executor.memoryOverhead, поэтому по умолчанию будет max(386, 0.10* executorMemory), который в вашем случае будет 400 МБ. Это звучит низко для меня. Я попытался бы увеличить его, чтобы сказать 1 ГБ (учтите, что если вы увеличиваете memoryOverhead до 1 ГБ, вам нужно опустить --executor-memory до 3 ГБ)

3) Посмотрите в файлах журнала на сбоях узлов. Вы хотите найти текст "Killing container". Если вы видите, что текст "выходит за пределы физической памяти", увеличение памяти. На мой взгляд, проблема с памятью будет решена.

Ответ 2

У меня также были хорошие результаты, увеличив время ожидания Spark spark.network.timeout до большего значения, например 800. По умолчанию 120 секунд заставит многих ваших исполнителей тайм-аут при большой нагрузке.

Ответ 3

Хорошо, это старая нить, и в Stackoverflow есть довольно много ответов, но я потерял пару дней до этой ошибки, и я думаю, что совместная история может помочь.

На самом деле это может быть несколько способов. Как упоминал Гленни, это скорее всего проблема памяти, поэтому убедитесь, что у вас достаточно памяти для всего. Есть память контейнера, память AM, память памяти, сокращение памяти и т.д. Конфигурации, которые нужно учитывать. Чтение this может помочь вам найти правильные конфигурации. Вы должны сами выбирать номера, но вот несколько свойств, которые я установил.

пряжи site.xml

<property>
    <name>yarn.nodemanager.resource.memory-mb</name>
    <value>32768</value>
</property>

<property>
    <name>yarn.app.mapreduce.am.resource.mb</name>
    <value>4096</value>
</property>

<property>
    <name>yarn.scheduler.minimum-allocation-mb</name>
    <value>4096</value>
</property>

mapred-site.xml

<property>
    <name>mapreduce.map.memory.mb</name>
    <value>4096</value>
</property>

<property>
    <name>mapreduce.reduce.memory.mb</name>
    <value>4096</value>
</property>

Они могут исправить некоторые другие ошибки, с которыми вы могли столкнуться, например, сбой при работе с PySpark при запуске. Но в моем случае, хотя некоторые ошибки исчезли (например, ошибки MetadataFetchFailed), проблема не устранена. Точная ошибка:

org.apache.spark.shuffle.FetchFailedException: не удалось подключиться к DB-ЕТ-С/х.х.х.м: 34085

После игры со всеми возможными свойствами YARN и Spark от тайм-аутов Spark к сервису Shuffle YARN я понял, что в журналах ошибок неудавшийся контейнер ищет x.x.x.x, локальный (внутренний) IP при запуске netstat -tulpn | grep <PORT NUM> вернулся yyyy: 34085, в котором yyyy является внешним IP-адресом. Это не проблема памяти, это была проблема с настройкой сети.

Служба Spark была привязана только к внешнему интерфейсу, поскольку имя хоста было связано с внешним IP-адресом в /etc/hosts. После обновления файла /etc/hosts проблема была исправлена.

Нижняя линия: Очевидно, ошибка говорит о том, что какой-то контейнер не может достичь другого. Обычно это связано с неудачными контейнерами из-за проблем с памятью, но также может быть сетевой проблемой, поэтому следите за ними, особенно если у вас есть несколько интерфейсов на ваших узлах.

Ответ 4

В дополнение к проблемам с памятью и сетевыми конфигурациями, описанным выше, стоит отметить, что для больших таблиц (например, несколько ТБ здесь) может возникнуть проблема org.apache.spark.shuffle.FetchFailedException из-за тайм-аута, получающего перетасованные разделы. Чтобы устранить эту проблему, вы можете установить следующее:

SET spark.reducer.maxReqsInFlight=1;  -- Only pull one file at a time to use full network bandwidth.
SET spark.shuffle.io.retryWait=60s;  -- Increase the time to wait while retrieving shuffle partitions before retrying. Longer times are necessary for larger files.
SET spark.shuffle.io.maxRetries=10;