Spark: RDD to List
У меня есть структура RDD
RDD[(String, String)]
и я хочу создать 2 списка (по одному для каждого измерения rdd).
Я попытался использовать rdd.foreach() и заполнить два ListBuffers, а затем преобразовать их в списки, но я думаю, каждый node создает свой собственный ListBuffer, потому что после итерации BufferLists пустые. Как я могу это сделать?
EDIT: мой подход
val labeled = data_labeled.map { line =>
val parts = line.split(',')
(parts(5), parts(7))
}.cache()
var testList : ListBuffer[String] = new ListBuffer()
labeled.foreach(line =>
testList += line._1
)
val labeledList = testList.toList
println("rdd: " + labeled.count)
println("bufferList: " + testList.size)
println("list: " + labeledList.size)
и результат:
rdd: 31990654
bufferList: 0
list: 0
Ответы
Ответ 1
Если вы действительно хотите создать два списка, то есть вы хотите, чтобы все распределенные данные были собраны в приложение драйвера (рискуя медленностью или OutOfMemoryError
) - вы можете использовать collect
, а затем использовать простые операции map
на результат:
val list: List[(String, String)] = rdd.collect().toList
val col1: List[String] = list.map(_._1)
val col2: List[String] = list.map(_._2)
В качестве альтернативы - если вы хотите "разбить" ваш RDD на два RDD - он очень похож, не собирая данные:
rdd.cache() // to make sure calculation of rdd is not repeated twice
val rdd1: RDD[String] = rdd.map(_._1)
val rdd2: RDD[String] = rdd.map(_._2)
Третий вариант состоит в том, чтобы сначала отобразить эти два RDD, а затем собрать каждый из них, но он не сильно отличается от первого варианта и страдает от тех же рисков и ограничений.
Ответ 2
Как альтернатива Tzach Zohar ответ, вы можете использовать unzip
в списках:
scala> val myRDD = sc.parallelize(Seq(("a", "b"), ("c", "d")))
myRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val (l1, l2) = myRDD.collect.toList.unzip
l1: List[String] = List(a, c)
l2: List[String] = List(b, d)
Или keys
и values
на RDD
s:
scala> val (rdd1, rdd2) = (myRDD.keys, myRDD.values)
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at keys at <console>:33
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at values at <console>:33
scala> rdd1.foreach{println}
a
c
scala> rdd2.foreach{println}
d
b
Ответ 3
Любая идея, почему списки будут пустыми после итерации? Я сталкиваюсь с той же проблемой.