Слияние Spark выводит CSV файлы с одним заголовком
Я хочу создать конвейер обработки данных в AWS, чтобы в конечном итоге использовать обработанные данные для Machine Learning.
У меня есть Scala-скрипт, который берет необработанные данные из S3, обрабатывает его и записывает в HDFS или даже S3 с помощью Spark-CSV. Я думаю, что я могу использовать несколько файлов в качестве входных данных, если я хочу использовать инструмент AWS Machine Learning для обучения модели прогнозирования. Но если я хочу использовать что-то еще, я полагаю, что лучше всего получить один выходной файл CSV.
В настоящее время, поскольку я не хочу использовать перераспределение (1) и не объединять (1) для целей производительности, я использовал hadoop fs -getmerge для ручного тестирования, но поскольку он просто сливает содержимое выходных файлов задания, я запускаю в маленькую проблему. Мне нужна одна строка заголовков в файле данных для обучения модели прогнозирования.
Если я использую .option("header","true")
для spark-csv, тогда он записывает заголовки в каждый выходной файл, и после слияния у меня есть столько строк заголовков в данных, сколько есть выходных файлов. Но если параметр заголовка является ложным, то он не добавляет заголовков.
Теперь я нашел вариант слияния файлов внутри скрипта Scala с API-интерфейсом Hadoop FileUtil.copyMerge
. Я попробовал это в spark-shell
с помощью кода ниже.
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")
Но это решение по-прежнему просто объединяет файлы друг с другом и не обрабатывает заголовки. Как я могу получить выходной файл только с одной строкой заголовков?
Я даже попытался добавить df.columns.mkString(",")
в качестве последнего аргумента для copyMerge
, но это добавило заголовки еще несколько раз, а не один раз.
Ответы
Ответ 1
вы можете ходить так.
- 1.Создайте новый DataFrame (headerDF), содержащий имена заголовков.
- 2. Используйте его с DataFrame (dataDF), содержащим данные.
- 3. Выведите объединенный DataFrame на диск с опцией ("header", "false").
- Файлы раздела 4.merge(part-0000 ** 0.csv) с использованием файла hasuop FileUtil
Таким образом, все разделы не имеют заголовка, за исключением того, что содержимое одного раздела содержит строку заголовков заголовка из заголовка. Когда все разделы объединены вместе, в верхней части файла находится один заголовок. Пример кода:
//dataFrame is the data to save on disk
//cast types of all columns to String
val dataDF = dataFrame.select(dataFrame.columns.map(c => dataFrame.col(c).cast("string")): _*)
//create a new data frame containing only header names
import scala.collection.JavaConverters._
val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)
//merge header names with data
headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder)
//use hadoop FileUtil to merge all partition csv files into a single file
val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null)
Ответ 2
Объединение файлов в папку в один файл:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}
Если вы хотите объединить все файлы в один файл, но все еще в одной папке (но это приводит все данные к узлу драйвера):
dataFrame
.coalesce(1)
.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save(out)
Другим решением было бы использовать решение №2, а затем переместить один файл внутри папки на другой путь (с именем нашего CSV файла).
def df2csv(df: DataFrame, fileName: String, sep: String = ",", header: Boolean = false): Unit = {
val tmpDir = "tmpDir"
df.repartition(1)
.write
.format("com.databricks.spark.csv")
.option("header", header.toString)
.option("delimiter", sep)
.save(tmpDir)
val dir = new File(tmpDir)
val tmpCsvFile = tmpDir + File.separatorChar + "part-00000"
(new File(tmpCsvFile)).renameTo(new File(fileName))
dir.listFiles.foreach( f => f.delete )
dir.delete
}
Ответ 3
Попробуйте указать схему заголовка и прочитать весь файл из папки, используя опцию drop malformed of spark-csv. Это должно позволить вам прочитать все файлы в папке, содержащие только заголовки (потому что вы отбрасываете неверные данные). Пример:
val headerSchema = List(
StructField("example1", StringType, true),
StructField("example2", StringType, true),
StructField("example3", StringType, true)
)
val header_DF =sqlCtx.read
.option("delimiter", ",")
.option("header", "false")
.option("mode","DROPMALFORMED")
.option("inferSchema","false")
.schema(StructType(headerSchema))
.format("com.databricks.spark.csv")
.load("folder containg the files")
В header_DF у вас будут только строки заголовков, из которых вы можете преобразовать фреймворк так, как вам нужно.
Ответ 4
// Convert JavaRDD to CSV and save as text file
outputDataframe.write()
.format("com.databricks.spark.csv")
// Header => true, will enable to have header in each file
.option("header", "true")
Пожалуйста, перейдите по ссылке с тестом Integration о том, как писать один заголовок
http://bytepadding.com/big-data/spark/write-a-csv-text-file-from-spark/