Spark 2.0.x выгружает csv файл из фрейма данных, содержащего один массив строки типа
У меня есть dataframe df
, который содержит один столбец массива типов
df.show()
выглядит как
|ID|ArrayOfString|Age|Gender|
+--+-------------+---+------+
|1 | [A,B,D] |22 | F |
|2 | [A,Y] |42 | M |
|3 | [X] |60 | F |
+--+-------------+---+------+
Я пытаюсь сбросить этот df
в файле csv следующим образом:
val dumpCSV = df.write.csv(path="/home/me/saveDF")
Он не работает из-за столбца ArrayOfString
. Я получаю сообщение об ошибке:
Источник данных CSV не поддерживает тип данных строки массива
Код работает, если я удаляю столбец ArrayOfString
. Но мне нужно сохранить ArrayOfString
!
Каким будет лучший способ сбросить фреймворк csv, включая столбец ArrayOfString (ArrayOfString должен быть сброшен как один столбец в файле CSV)
Ответы
Ответ 1
Причина, по которой вы получаете эту ошибку, заключается в том, что формат файла csv не поддерживает типы массивов, вам нужно выразить его как строку, которая сможет сохранить.
Попробуйте следующее:
import org.apache.spark.sql.functions._
val stringify = udf((vs: Seq[String]) => vs match {
case null => null
case _ => s"""[${vs.mkString(",")}]"""
})
df.withColumn("ArrayOfString", stringify($"ArrayOfString")).write.csv(...)
или же
import org.apache.spark.sql.Column
def stringify(c: Column) = concat(lit("["), concat_ws(",", c), lit("]"))
df.withColumn("ArrayOfString", stringify($"ArrayOfString")).write.csv(...)
Ответ 2
Реализация Pyspark:
В этом примере перед сохранением измените поле column_as_array
на column_as_string
.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def array_to_string(my_list):
return '[' + ','.join([str(elem) for elem in my_list]) + ']'
array_to_string_udf = udf(array_to_string,StringType())
df = df.withColumn('column_as_str',array_to_string_udf(d["column_as_array"]))
Затем вы можете удалить старый столбец (тип массива) перед сохранением.
df.drop("column_as_array").write.csv(...)
Ответ 3
Нет необходимости в UDF, если вы уже знаете, какие поля содержат массивы. Вы можете просто использовать функцию Cast Spark:
val dumpCSV = df.withColumn("ArrayOfString", lit("ArrayOfString).cast("string"))
.write
.csv(path="/home/me/saveDF"
)
Надеюсь, это поможет.
Ответ 4
Вот метод для преобразования всех столбцов ArrayType
(любого базового типа) столбца DataFrame
в StringType
:
def stringifyArrays(dataFrame: DataFrame): DataFrame = {
val colsToStringify = dataFrame.schema.filter(p => p.dataType.typeName == "array").map(p => p.name)
colsToStringify.foldLeft(dataFrame)((df, c) => {
df.withColumn(c, concat(lit("["), concat_ws(", ", col(c).cast("array<string>")), lit("]")))
})
}
Кроме того, он не использует UDF.
Ответ 5
CSV не является идеальным форматом экспорта, но если вы просто хотите визуально проверить свои данные, это будет работать [ Scala]. Быстрое и грязное решение.
case class example ( id: String, ArrayOfString: String, Age: String, Gender: String)
df.rdd.map{line => example(line(0).toString, line(1).toString, line(2).toString , line(3).toString) }.toDF.write.csv("/tmp/example.csv")