Добавить заголовок перед текстовым файлом при сохранении в Spark
У меня есть искровой код для обработки файла csv. На нем делается некоторое преобразование. Теперь я хочу сохранить этот RDD как файл csv и добавить заголовок. Каждая строка этого RDD уже отформатирована правильно.
Я не уверен, как это сделать. Я хотел сделать соединение с строкой заголовка и моим RDD, но строка заголовка не является RDD, поэтому она не работает.
Ответы
Ответ 1
Вы можете сделать RDD из строки заголовка, а затем union
it, yes:
val rdd: RDD[String] = ...
val header: RDD[String] = sc.parallelize(Array("my,header,row"))
header.union(rdd).saveAsTextFile(...)
Затем вы получите кучу part-xxxxx
файлов, которые вы объедините.
Проблема в том, что я не думаю, что вам гарантировано, что заголовок будет первым разделом и, следовательно, окажется в part-00000
и в верхней части вашего файла. На практике я уверен, что это будет.
Более надежным было бы использовать команды Hadoop, такие как hdfs
, чтобы объединить файлы part-xxxxx
, и как часть команды просто введите строку заголовка из файла.
Ответ 2
Некоторая помощь при написании без соединения (Поставляется заголовок во время слияния)
val fileHeader ="This is header"
val fileHeaderStream: InputStream = new ByteArrayInputStream(fileHeader.getBytes(StandardCharsets.UTF_8));
val output = IOUtils.copyBytes(fileHeaderStream,out,conf,false)
Теперь прокрутите фрагменты файлов, чтобы записать полный файл, используя
val in: DataInputStream = ...<data input stream from file >
IOUtils.copyBytes(in, output, conf, false)
Это сделало для меня уверенным, что заголовок всегда приходит как первая строка, даже если вы используете "coalasec/repartition" для эффективной записи
Ответ 3
def addHeaderToRdd(sparkCtx: SparkContext, lines: RDD[String], header: String): RDD[String] = {
val headerRDD = sparkCtx.parallelize(List((-1L, header))) // We index the header with -1, so that the sort will put it on top.
val pairRDD = lines.zipWithIndex()
val pairRDD2 = pairRDD.map(t => (t._2, t._1))
val allRDD = pairRDD2.union(headerRDD)
val allSortedRDD = allRDD.sortByKey()
return allSortedRDD.values
}
Ответ 4
Слегка дифференцированный подход с Spark SQL
Из вопроса: теперь я хочу сохранить этот RDD как файл CSV и добавить заголовок. Каждая строка этого RDD уже отформатирована правильно.
С Spark 2.x у вас есть несколько вариантов конвертировать RDD в DataFrame
val rdd = .... //Assume rdd properly formatted with case class or tuple
val df = spark.createDataFrame(rdd).toDF("col1", "col2", ... "coln")
df.write
.format("csv")
.option("header", "true") //adds header to file
.save("hdfs://location/to/save/csv")
Теперь мы можем использовать Spark SQL DataFrame для загрузки, преобразования и сохранения CSV файла
Ответ 5
spark.sparkContext.parallelize(Seq(SqlHelper.getARow(temRet.columns,
temRet.columns.length))).union(temRet.rdd).map(x =>
x.mkString("\x01")).coalesce(1, true).saveAsTextFile(retPath)
object SqlHelper {
//create one row
def getARow(x: Array[String], size: Int): Row = {
var columnArray = new Array[String](size)
for (i <- 0 to (size - 1)) {
columnArray(i) = x(i).toString()
}
Row.fromSeq(columnArray)
}
}