Искра при объединении много RDD выдает ошибку
Когда я использую "++" для объединения большого количества RDD, у меня возникла ошибка с ошибкой потока.
Искра версия 1.3.1
Окружающая среда: пряжа-клиент. --driver-memory 8G
Число RDD больше 4000. Каждый RDD считывается из текстового файла размером 1 ГБ.
Он генерируется таким образом
val collection = (for (
path <- files
) yield sc.textFile(path)).reduce(_ union _)
Он отлично работает, когда files
имеет небольшой размер.
И есть ошибка
Ошибка повторяется. Я думаю, это функция рекурсии, которая называется слишком много времени?
Exception at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
.....
Ответы
Ответ 1
Используйте SparkContext.union(...)
, чтобы объединить много RDD сразу.
Вы не хотите делать это за один раз, так как RDD.union() создает новый шаг в линейке (дополнительный набор кадров стека при любом вычислении) для каждого RDD, тогда как SparkContext.union( ) делает все сразу. Это гарантирует, что вы не получите ошибку.
Ответ 2
Кажется, что когда объединение RDD по одному может попасть в серию очень длинных рекурсивных вызовов функций.
В этом случае нам нужно увеличить стек памяти JVM.
В искровом режиме с опцией --driver-java-options "-Xss 100M"
, память драйвера jvm установлена в 100M.
Решение Sean Owen также решает проблему более элегантным способом.