Сохранение содержимого Spark DataFrame в виде единого файла CSV
Скажем, у меня есть Spark DataFrame, который я хочу сохранить как CSV файл. После Spark 2.0.0 класс DataFrameWriter напрямую поддерживает сохранение его как файла CSV.
Поведение по умолчанию - сохранить вывод в нескольких файлах part - *.csv внутри предоставленного пути.
Как мне сохранить DF с помощью
- Отображение пути к точному имени файла вместо папки
- Заголовок доступен в первой строке
- Сохранить как один файл вместо нескольких файлов.
Один из способов борьбы с ним - объединить DF, а затем сохранить файл.
df.coalesce(1).write.option("header", "true").csv("sample_file.csv")
Однако это имеет недостаток в сборе его на мастер-машине и требует наличия мастера с достаточной памятью.
Можно ли написать один CSV файл без использования coalesce? Если нет, существует ли эффективный способ, чем приведенный выше код?
Ответы
Ответ 1
Просто решил это сам, используя pyspark с dbutils, чтобы получить .csv и переименовать в нужное имя файла.
save_location= "s3a://landing-bucket-test/export/"+year
csv_location = save_location+"temp.folder'
file_location = save_location+'export.csv'
df.repartition(1).write.csv(path=csv_location, mode="append", header="true")
file = dbutils.fs.ls(csv_location)[-1].path
dbutils.fs.cp(file, file_location)
dbutils.fs.rm(csv_location, recurse=True)
Этот ответ можно улучшить, не используя [-1], но .csv, кажется, всегда последний в папке. Простое и быстрое решение, если вы работаете только с небольшими файлами и можете использовать перераспределение (1) или объединение (1).
Ответ 2
Использование:
df.toPandas().to_csv("sample_file.csv", header=True)
Подробности смотрите в документации:https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe#pyspark.sql.DataFrame.toPandas
Ответ 3
Это решение основано на оболочке Script и не распараллеливается, но все еще очень быстро, особенно на SSD. Он использует cat
и перенаправление вывода в Unix-системах. Предположим, что каталог CSV, содержащий разделы, находится на /my/csv/dir
и что выходной файл /my/csv/output.csv
:
#!/bin/bash
echo "col1,col2,col3" > /my/csv/output.csv
for i in /my/csv/dir/*.csv ; do
echo "Processing $i"
cat $i >> /my/csv/output.csv
rm $i
done
echo "Done"
Он удалит каждый раздел после добавления его в окончательный CSV, чтобы освободить место.
"col1,col2,col3"
- это заголовок CSV (здесь мы имеем три столбца с именем col1
, col2
и col3
). Вы должны сказать Spark, чтобы он не помещал заголовок в каждый раздел (это выполняется с помощью .option("header", "false")
, потому что это сделает Shell Script.
Ответ 4
Для тех, кто все еще хочет сделать это здесь, как я это сделал, используя искра 2.1 в scala с помощью некоторой справки java.nio.file
.
На основе https://fullstackml.com/how-to-export-data-frame-from-apache-spark-3215274ee9d6
val df: org.apache.spark.sql.DataFrame = ??? // data frame to write
val file: java.nio.file.Path = ??? // target output file (i.e. 'out.csv')
import scala.collection.JavaConversions._
// write csv into temp directory which contains the additional spark output files
// could use Files.createTempDirectory instead
val tempDir = file.getParent.resolve(file.getFileName + "_tmp")
df.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save(tempDir.toAbsolutePath.toString)
// find the actual csv file
val tmpCsvFile = Files.walk(tempDir, 1).iterator().toSeq.find { p =>
val fname = p.getFileName.toString
fname.startsWith("part-00000") && fname.endsWith(".csv") && Files.isRegularFile(p)
}.get
// move to desired final path
Files.move(tmpCsvFile, file)
// delete temp directory
Files.walk(tempDir)
.sorted(java.util.Comparator.reverseOrder())
.iterator().toSeq
.foreach(Files.delete(_))
Ответ 5
Следующий метод scala работает в локальном или клиентском режиме и записывает df в один CSV файл с выбранным именем. Это требует, чтобы df помещался в память, иначе collect() взорвется.
import org.apache.hadoop.fs.{FileSystem, Path}
val SPARK_WRITE_LOCATION = some_directory
val SPARKSESSION = org.apache.spark.sql.SparkSession
def saveResults(results : DataFrame, filename: String) {
var fs = FileSystem.get(this.SPARKSESSION.sparkContext.hadoopConfiguration)
if (SPARKSESSION.conf.get("spark.master").toString.contains("local")) {
fs = FileSystem.getLocal(new conf.Configuration())
}
val tempWritePath = new Path(SPARK_WRITE_LOCATION)
if (fs.exists(tempWritePath)) {
val x = fs.delete(new Path(SPARK_WRITE_LOCATION), true)
assert(x)
}
if (results.count > 0) {
val hadoopFilepath = new Path(SPARK_WRITE_LOCATION, filename)
val writeStream = fs.create(hadoopFilepath, true)
val bw = new BufferedWriter( new OutputStreamWriter( writeStream, "UTF-8" ) )
val x = results.collect()
for (row : Row <- x) {
val rowString = row.mkString(start = "", sep = ",", end="\n")
bw.write(rowString)
}
bw.close()
writeStream.close()
val resultsWritePath = new Path(WRITE_DIRECTORY, filename)
if (fs.exists(resultsWritePath)) {
fs.delete(resultsWritePath, true)
}
fs.copyToLocalFile(false, hadoopFilepath, resultsWritePath, true)
} else {
System.exit(-1)
}
}
Ответ 6
FileUtil.copyMerge() из API Hadoop должен решить вашу проблему.
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
}
См. Запись одного CSV файла с использованием spark-csv
Ответ 7
Вот как работают распределенные вычисления! Несколько файлов внутри каталога - это точно то, как работают распределенные вычисления, это не проблема вообще, поскольку все программное обеспечение может справиться с этим.
Ваш вопрос должен быть "как можно загрузить CSV, состоящий из нескольких файлов?" → уже существует множество решений в SO.
Другим подходом может быть использование Spark в качестве источника JDBC (с удивительным сервером Spark Thrift), запись SQL-запроса и преобразование результата в CSV.
Чтобы предотвратить OOM в драйвере (поскольку драйвер получит ВСЕ данные), используйте инкрементный сбор (spark.sql.thriftServer.incrementalCollect=true
), больше информации на http://www.russellspitzer.com/2017/05/19/Spark-Sql-Thriftserver/.
Небольшая заметка о концепции "данных раздела" Искры ":
INPUT (X PARTITIONs) -> COMPUTING (Y PARTITIONs) -> OUTPUT (Z PARTITIONs)
Между "этапами" данные могут передаваться между разделами, это "тасование". Вы хотите "Z" = 1, но с Y > 1, без тасования? это невозможно.
Ответ 8
df.coalesce(1).write.option("inferSchema","true").csv("/newFolder",header =
'true',dateFormat = "yyyy-MM-dd HH:mm:ss")