Почему "Ошибка связи с MapOutputTracker" сообщается, когда Spark пытается отправить GetMapOutputStatuses?

Я использую Spark 1.3 для создания агрегации на большом количестве данных. Задача состоит из 4 шагов:

  • Прочитайте большой файл последовательности (1 ТБ) (соответствующий 1 день данных)
  • Отфильтруйте большую часть его и получите около 1 ГБ записи в случайном порядке.
  • keyBy клиент
  • aggregateByKey() для настраиваемой структуры, которая создает профиль для этого клиента, что соответствует HashMap [Long, Float] для каждого клиента. Длинные ключи уникальны и не превышают 50 тыс. Записей.

Я запускаю это с помощью этой конфигурации:

--name geo-extract-$1-askTimeout \
--executor-cores 8 \
--num-executors 100 \
--executor-memory 40g \
--driver-memory 4g \
--driver-cores 8 \
--conf 'spark.storage.memoryFraction=0.25' \
--conf 'spark.shuffle.memoryFraction=0.35' \
--conf 'spark.kryoserializer.buffer.max.mb=1024' \
--conf 'spark.akka.frameSize=1024' \
--conf 'spark.akka.timeout=200' \
--conf 'spark.akka.askTimeout=111' \
--master yarn-cluster \

И получив эту ошибку:

    org.apache.spark.SparkException: Error communicating with MapOutputTracker
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117)
        at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164)
        at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
        ...
    Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(0)]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
        ... 21 more
    Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)

Было показано, что работа и логика работают с небольшим набором тестов, и я могу даже запустить это задание для некоторых дат, но не для других. Я googled вокруг и нашел подсказки, что "Ошибка связи с MapOutputTracker" связана с внутренними сообщениями Spark, но я уже увеличил "spark.akka.frameSize", "spark.akka.timeout" и "spark.akka.askTimeout" ( этот последний даже не появляется в документации Spark, но упоминается в списке рассылки Spark), но безрезультатно. Есть еще некоторый тайм-аут, продолжающийся через 30 секунд, что я не знаю, как идентифицировать или исправить.

Я не вижу причин для отказа из-за размера данных, поскольку операция фильтрации и факт, что aggregateByKey выполняет локальные частичные агрегации, должны быть достаточными для определения размера данных. Количество задач - 16K (автоматическое от исходного ввода), намного больше, чем 800 ядер, которые работают на 100 исполнителей, поэтому это не так просто, как обычный "приращивание разделов". Любые подсказки будут очень признательны! Спасибо!

Ответы

Ответ 1

У меня была аналогичная проблема, что моя работа будет работать отлично с меньшим набором данных, но не с большими.

После большого количества изменений конфигурации я обнаружил, что изменение параметров памяти драйвера оказывает гораздо большее влияние, чем изменение настроек памяти исполнителя. Также полезно использовать новый сборщик мусора. Я использую следующую конфигурацию для кластера из 3 с 40 ядрами каждый. Надеемся, что следующая конфигурация поможет:

spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -  
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g 
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions

spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g   
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions


spark.driver.memory=8g
spark.driver.cores=10
spark.driver.maxResultSize=8g

spark.executor.memory=16g
spark.executor.cores=25

spark.default.parallelism=50
spark.eventLog.dir=hdfs://mars02-db01/opt/spark/logs
spark.eventLog.enabled=true

spark.kryoserializer.buffer=512m
spark.kryoserializer.buffer.max=1536m

spark.rdd.compress=true
spark.storage.memoryFraction=0.15
spark.storage.MemoryStore=12g

Ответ 2

Что происходит в драйвере во время этого отказа? Это может быть из-за давления памяти на водителя, вызывающего его невосприимчивость. Если я правильно помню, MapOutputTracker, к которому он пытается добраться, когда он вызывает GetMapOutputStatuses, выполняется в драйвере драйвера Spark.

Если вы столкнулись с длинными GC или другими паузами по какой-либо причине в этом процессе, это приведет к исключениям, которые вы видите выше.

Некоторые вещи, которые нужно попробовать, - попытаться выполнить jtacking процесс драйвера, когда вы начнете видеть эти ошибки и посмотреть, что произойдет. Если jstack не отвечает, возможно, ваш драйвер недостаточно реагирует.

Задачи 16K звучат так, как было бы очень важно, чтобы драйвер отслеживал, любой шанс увеличить память драйвера за 4g?