Spark sql как взорваться без потери нулевых значений
У меня есть датафрейм, который я пытаюсь сплющить. Как часть процесса, я хочу разобрать его, поэтому, если у меня есть столбец массивов, каждое значение массива будет использоваться для создания отдельной строки. Например,
id | name | likes
_______________________________
1 | Luke | [baseball, soccer]
должен стать
id | name | likes
_______________________________
1 | Luke | baseball
1 | Luke | soccer
Это мой код
private DataFrame explodeDataFrame(DataFrame df) {
DataFrame resultDf = df;
for (StructField field : df.schema().fields()) {
if (field.dataType() instanceof ArrayType) {
resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name())));
resultDf.show();
}
}
return resultDf;
}
Проблема в том, что в моих данных некоторые столбцы массива имеют нулевые значения. В этом случае вся строка удаляется. Итак, этот фрейм данных:
id | name | likes
_______________________________
1 | Luke | [baseball, soccer]
2 | Lucy | null
становится
id | name | likes
_______________________________
1 | Luke | baseball
1 | Luke | soccer
вместо
id | name | likes
_______________________________
1 | Luke | baseball
1 | Luke | soccer
2 | Lucy | null
Как я могу взорвать свои массивы, чтобы не потерять пустые строки?
Я использую Spark 1.5.2 и Java 8
Ответы
Ответ 1
Искра 2.2 +
Вы можете использовать функцию explode_outer
:
import org.apache.spark.sql.functions.explode_outer
df.withColumn("likes", explode_outer($"likes")).show
// +---+----+--------+
// | id|name| likes|
// +---+----+--------+
// | 1|Luke|baseball|
// | 1|Luke| soccer|
// | 2|Lucy| null|
// +---+----+--------+
Spark & lt; = 2.1
В Scala, но эквивалент Java должен быть почти идентичным (для импорта отдельных функций используйте import static
).
import org.apache.spark.sql.functions.{array, col, explode, lit, when}
val df = Seq(
(1, "Luke", Some(Array("baseball", "soccer"))),
(2, "Lucy", None)
).toDF("id", "name", "likes")
df.withColumn("likes", explode(
when(col("likes").isNotNull, col("likes"))
// If null explode an array<string> with a single null
.otherwise(array(lit(null).cast("string")))))
Идея здесь состоит в том, чтобы заменить NULL
на array(NULL)
желаемого типа. Для сложного типа (a.k.a structs
) вам необходимо предоставить полную схему:
val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y")
val st = StructType(Seq(
StructField("_1", IntegerType, false), StructField("_2", StringType, true)
))
dfStruct.withColumn("y", explode(
when(col("y").isNotNull, col("y"))
.otherwise(array(lit(null).cast(st)))))
или
dfStruct.withColumn("y", explode(
when(col("y").isNotNull, col("y"))
.otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))
Примечание
Если массив Column
был создан с containsNull
установленным в false
, вы должны сначала изменить это (проверено с помощью Spark 2.1):
df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))
Ответ 2
Вслед за принятым ответом, когда элементы массива являются сложным типом, может быть трудно определить его вручную (например, с большими структурами).
Чтобы сделать это автоматически, я написал следующий вспомогательный метод:
def explodeOuter(df: Dataset[Row], columnsToExplode: List[String]) = {
val arrayFields = df.schema.fields
.map(field => field.name -> field.dataType)
.collect { case (name: String, type: ArrayType) => (name, type.asInstanceOf[ArrayType])}
.toMap
columnsToExplode.foldLeft(df) { (dataFrame, arrayCol) =>
dataFrame.withColumn(arrayCol, explode(when(size(col(arrayCol)) =!= 0, col(arrayCol))
.otherwise(array(lit(null).cast(arrayFields(arrayCol).elementType)))))
}
Редактировать: кажется, что в Spark 2.2 и новее это встроено.
Ответ 3
Вы можете использовать функцию explode_outer()
.