Ответ 1
Я закончил обновление моей искровой версии, и проблема была решена.
Выполнение искрового задания с использованием scala, как и ожидалось, все задания заканчиваются вовремя, но как-то некоторые журналы INFO печатаются в течение 20-25 минут до остановки задания.
Проводка нескольких снимков пользовательского интерфейса, которые могут помочь устранить проблему.
Я не понимаю, почему так много времени проводилось между обеими идентификаторами работы.
Ниже приведен фрагмент кода:
val sc = new SparkContext(conf)
for (x <- 0 to 10) {
val zz = getFilesList(lin);
val links = zz._1
val path = zz._2
lin = zz._3
val z = sc.textFile(links.mkString(",")).map(t => t.split('\t')).filter(t => t(4) == "xx" && t(6) == "x").map(t => titan2(t)).filter(t => t.length > 35).map(t => ((t(34)), (t(35), t(5), t(32), t(33))))
val way_nodes = sc.textFile(way_source).map(t => t.split(";")).map(t => (t(0), t(1)));
val t = z.join(way_nodes).map(t => (t._2._1._2, Array(Array(t._2._1._2, t._2._1._3, t._2._1._4, t._2._1._1, t._2._2)))).reduceByKey((t, y) => t ++ y).map(t => process(t)).flatMap(t => t).combineByKey(createTimeCombiner, timeCombiner, timeMerger).map(averagingFunction).map(t => t._1 + "," + t._2)
t.saveAsTextFile(path)
}
sc.stop()
Несколько дополнительных действий: spark-1.4.1 saveAsTextFile to S3 очень медленный на emr-4.0.0
Я закончил обновление моей искровой версии, и проблема была решена.
Как я добавил в комментарии, я рекомендую использовать пакет spark-csv вместо sc.saveAsTextFile
, и нет проблем с записью непосредственно на s3 с помощью этого пакета:)
Я не знаю, используете ли вы s3 или s3n, но можете попробовать переключиться. У меня возникли проблемы с использованием s3a на Spark 1.5.2 (EMR-4.2), где записи все время выходят из строя и переключение на s3 решило проблему, поэтому стоит попробовать.
Несколько других вещей, которые должны ускорить запись в s3, - это использовать DirectOutputCommiter
conf.set("spark.hadoop.mapred.output.committer.class","com.appsflyer.spark.DirectOutputCommitter")
и отключить генерацию файлов _SUCCESS:
sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
Обратите внимание, что отключение файлов _SUCCESS должно быть установлено в конфигурации hadoop SparkContext
, а не на SparkConf
.
Надеюсь, это поможет.
У меня была такая же проблема при записи файлов на S3. Я использую версию искры 2.0, чтобы дать вам обновленный код для подтвержденного ответа.
В Spark 2.0 вы можете использовать
val spark = SparkSession.builder().master("local[*]").appName("App_name").getOrCreate()
spark.conf.set("spark.hadoop.mapred.output.committer.class","com.appsflyer.spark.DirectOutputCommitter")
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
Это решило мою проблему с удалением работы