Как писать в CSV в Spark
Я пытаюсь найти эффективный способ сохранить результат работы Spark Job в виде файла csv. Я использую Spark с Hadoop, и до сих пор все мои файлы сохраняются как part-00000
.
Любые идеи о том, как сделать сохранение искры для файла с указанным именем файла?
Ответы
Ответ 1
Так как Spark использует API файловой системы Hadoop для записи данных в файлы, это неизбежно. Если вы делаете
rdd.saveAsTextFile("foo")
Он будет сохранен как "foo/part-XXXXX
" с одним part- * файлом каждого раздела в RDD, который вы пытаетесь сохранить. Причина, по которой каждый раздел в RDD написан отдельным файлом, относится к отказоустойчивости. Если задача, записывающая третий раздел (т.е. На part-00002
), не работает, Spark просто перезапускает задачу и перезаписывает частично написанную/поврежденную part-00002
, не влияя на другие части. Если все они писали в один файл, то для сбоев намного сложнее восстановить одну задачу.
Файлы part-XXXXX
обычно не являются проблемой, если вы собираетесь использовать его снова в инфраструктуре на основе Spark/Hadoop, потому что, поскольку все они используют API HDFS, если вы попросите их прочитать "foo", они все будут читать все файлы part-XXXXX
внутри foo.
Ответ 2
Я предлагаю сделать это таким образом (пример Java):
theRddToPrint.coalesce(1, true).saveAsTextFile(textFileName);
FileSystem fs = anyUtilClass.getHadoopFileSystem(rootFolder);
FileUtil.copyMerge(
fs, new Path(textFileName),
fs, new Path(textFileNameDestiny),
true, fs.getConf(), null);
Ответ 3
Существует другой подход, основанный на операциях Hadoop FileSystem.
Ответ 4
У меня есть идея, но не готовый фрагмент кода. Внутренне (как следует из названия) Spark использует выходной формат Hadoop. (а также InputFormat
при чтении из HDFS).
В hasoop FileOutputFormat
существует защищенный член setOutputFormat
, который вы можете вызывать из унаследованного класса для установки другого базового имени.
Ответ 5
Расширение Tathagata Das ответа на Spark 2.x и Scala 2.11
Используя Spark SQL, мы можем сделать это в один лайнер
//implicits for magic functions like .toDf
import spark.implicits._
val df = Seq(
("first", 2.0),
("choose", 7.0),
("test", 1.5)
).toDF("name", "vals")
//write DataFrame/DataSet to external storage
df.write
.format("csv")
.save("csv/file/location")
Затем вы можете пойти и продолжить с adoalonso ответом.
Ответ 6
Это не очень чистое решение, но внутри foreachRDD
() вы можете в основном делать все, что захотите, а также создавать новый файл.
В моем решении это то, что я делаю: я сохраняю вывод на HDFS (по причинам отказоустойчивости), а внутри foreachRDD
я также создаю файл TSV со статистикой в локальной папке.
Я думаю, вы могли бы сделать то же самое, если это вам нужно.
http://spark.apache.org/docs/0.9.1/streaming-programming-guide.html#output-operations