Как написать полученный RDD в файл csv в Spark python
У меня есть RDD labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
. Это выводится в этом формате:
[(0.0, 0.08482142857142858), (0.0, 0.11442786069651742),.....]
Я хочу создать CSV файл с одним столбцом для labels
(первая часть кортежа в выводе выше) и один для predictions
(вторая часть вывода кортежа). Но я не знаю, как писать в CSV файл в Spark с помощью Python.
Как создать файл CSV с указанным выше выходом?
Ответы
Ответ 1
Просто map
строки RDD (labelsAndPredictions
) в строки (строки CSV), затем используйте rdd.saveAsTextFile()
.
def toCSVLine(data):
return ','.join(str(d) for d in data)
lines = labelsAndPredictions.map(toCSVLine)
lines.saveAsTextFile('hdfs://my-node:9000/tmp/labels-and-predictions.csv')
Ответ 2
Я знаю, что это старый пост. Но чтобы помочь кому-то найти то же самое, вот как я пишу два столбца RDD в один файл CSV в PySpark 1.6.2
RDD:
>>> rdd.take(5)
[(73342, u'cells'), (62861, u'cell'), (61714, u'studies'), (61377, u'aim'), (60168, u'clinical')]
Теперь код:
# First I convert the RDD to dataframe
from pyspark import SparkContext
df = sqlContext.createDataFrame(rdd, ['count', 'word'])
DF:
>>> df.show()
+-----+-----------+
|count| word|
+-----+-----------+
|73342| cells|
|62861| cell|
|61714| studies|
|61377| aim|
|60168| clinical|
|59275| 2|
|59221| 1|
|58274| data|
|58087|development|
|56579| cancer|
|50243| disease|
|49817| provided|
|49216| specific|
|48857| health|
|48536| study|
|47827| project|
|45573|description|
|45455| applicant|
|44739| program|
|44522| patients|
+-----+-----------+
only showing top 20 rows
Теперь напишите в CSV
# Write CSV (I have HDFS storage)
df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('file:///home/username/csv_out')
P.S: Я только начинаю изучать сообщения из Stackoverflow. Поэтому я не знаю, лучший ли это. Но это сработало для меня, и я надеюсь, что это поможет кому-то!
Ответ 3
Нехорошо присоединяться запятыми, потому что если поля содержат запятые, они не будут правильно процитировать, например. ','.join(['a', 'b', '1,2,3', 'c'])
дает вам a,b,1,2,3,c
, если вы хотите a,b,"1,2,3",c
. Вместо этого вы должны использовать модуль csv Python для преобразования каждого списка в RDD в строчную строку csv:
# python 3
import csv, io
def list_to_csv_str(x):
"""Given a list of strings, returns a properly-csv-formatted string."""
output = io.StringIO("")
csv.writer(output).writerow(x)
return output.getvalue().strip() # remove extra newline
# ... do stuff with your rdd ...
rdd = rdd.map(list_to_csv_str)
rdd.saveAsTextFile("output_directory")
Поскольку модуль csv записывает только в объекты файла, мы должны создать пустой "файл" с io.StringIO("")
и сообщить csv.writer написать в него строку csv. Затем мы используем output.getvalue()
, чтобы получить строку, которую мы просто написали в "файл". Чтобы этот код работал с Python 2, просто замените io на модуль StringIO.
Если вы используете API-интерфейс Spark DataFrames, вы также можете посмотреть функцию сохранения DataBricks, которая имеет формат csv.