Автоматически и элегантно сглаживает DataFrame в Spark SQL
Все,
Есть ли элегантный и принятый способ сгладить таблицу Spark SQL (паркет) с столбцами, вложенными StructType
Например
Если моя схема:
foo
|_bar
|_baz
x
y
z
Как выбрать его в плоскую табличную форму, не прибегая к ручному запуску
df.select("foo.bar","foo.baz","x","y","z")
Другими словами, как получить результат вышеуказанного кода программно с учетом только StructType
и DataFrame
Ответы
Ответ 1
Короткий ответ: там нет "приемлемого" способа сделать это, но вы можете сделать это очень элегантно с рекурсивной функцией, которая генерирует ваш оператор select(...)
, пройдя через DataFrame.schema
.
Рекурсивная функция должна возвращать Array[Column]
. Каждый раз, когда функция попадает в StructType
, она сама вызывается и добавляет возвращенный Array[Column]
в свой собственный Array[Column]
.
Что-то вроде:
def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix + "." + f.name)
f.dataType match {
case st: StructType => flattenSchema(st, colName)
case _ => Array(col(colName))
}
})
}
Затем вы будете использовать его следующим образом:
df.select(flattenSchema(df.schema):_*)
Ответ 2
Я улучшаю свой предыдущий ответ и предлагаю решение своей проблемы, изложенную в комментариях к принятому ответу.
Это принятое решение создает массив объектов Column и использует его для выбора этих столбцов. В Spark, если у вас есть вложенный DataFrame, вы можете выбрать дочерний столбец следующим образом: df.select("Parent.Child")
, и это возвращает DataFrame со значениями дочернего столбца и называется Ребенок. Но если у вас одинаковые имена для атрибутов разных родительских структур, вы теряете информацию о родительском элементе и можете иметь идентичные имена столбцов и больше не можете обращаться к ним по имени, поскольку они недвусмысленны.
Это была моя проблема.
Я нашел решение своей проблемы, возможно, это может помочь кому-то еще. Я назвал flattenSchema
отдельно:
val flattenedSchema = flattenSchema(df.schema)
и это возвращает объект Array of Column. Вместо использования этого параметра в select()
, который вернет DataFrame с столбцами, названными дочерним элементом последнего уровня, я сопоставил исходные имена столбцов как строки, а после выбора столбца Parent.Child
он переименовал его как Parent.Child
вместо Child
(я также заменил точки с подчеркиванием для моего удобства):
val renamedCols = flattenedSchema.map(name => col(name.toString()).as(name.toString().replace(".","_")))
И затем вы можете использовать функцию выбора, как показано в исходном ответе:
var newDf = df.select(renamedCols:_*)
Ответ 3
Просто хотел поделиться своим решением для Pyspark - это более или менее перевод @David Griffin solution, поэтому он поддерживает любой уровень вложенных объектов.
from pyspark.sql.types import StructType, ArrayType
def flatten(schema, prefix=None):
fields = []
for field in schema.fields:
name = prefix + '.' + field.name if prefix else field.name
dtype = field.dataType
if isinstance(dtype, ArrayType):
dtype = dtype.elementType
if isinstance(dtype, StructType):
fields += flatten(dtype, prefix=name)
else:
fields.append(name)
return fields
df.select(flatten(df.schema)).show()
Ответ 4
Вы также можете использовать SQL для выбора столбцов как плоских.
- Получить исходную схему фрейма данных
- Генерировать строку SQL, просматривая схему
- Запросить исходный кадр данных
Я реализовал реализацию на Java: https://gist.github.com/ebuildy/3de0e2855498e5358e4eed1a4f72ea48
(используйте рекурсивный метод, я предпочитаю SQL-путь, поэтому вы можете легко его протестировать через Spark-shell).
Ответ 5
Вот функция, которая делает то, что вы хотите, и может иметь дело с несколькими вложенными столбцами, содержащими столбцы с одинаковым именем, с префиксом:
from pyspark.sql import functions as F
def flatten_df(nested_df):
flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
flat_df = nested_df.select(flat_cols +
[F.col(nc+'.'+c).alias(nc+'_'+c)
for nc in nested_cols
for c in nested_df.select(nc+'.*').columns])
return flat_df
До:
root
|-- x: string (nullable = true)
|-- y: string (nullable = true)
|-- foo: struct (nullable = true)
| |-- a: float (nullable = true)
| |-- b: float (nullable = true)
| |-- c: integer (nullable = true)
|-- bar: struct (nullable = true)
| |-- a: float (nullable = true)
| |-- b: float (nullable = true)
| |-- c: integer (nullable = true)
После:
root
|-- x: string (nullable = true)
|-- y: string (nullable = true)
|-- foo_a: float (nullable = true)
|-- foo_b: float (nullable = true)
|-- foo_c: integer (nullable = true)
|-- bar_a: float (nullable = true)
|-- bar_b: float (nullable = true)
|-- bar_c: integer (nullable = true)
Ответ 6
Я добавил метод DataFrame#flattenSchema
в проект spark-daria с открытым исходным кодом.
Вот как вы можете использовать функцию с вашим кодом.
import com.github.mrpowers.spark.daria.sql.DataFrameExt._
df.flattenSchema().show()
+-------+-------+---------+----+---+
|foo.bar|foo.baz| x| y| z|
+-------+-------+---------+----+---+
| this| is|something|cool| ;)|
+-------+-------+---------+----+---+
Вы также можете указать разные разделители имен столбцов с помощью flattenSchema()
.
df.flattenSchema(delimiter = "_").show()
+-------+-------+---------+----+---+
|foo_bar|foo_baz| x| y| z|
+-------+-------+---------+----+---+
| this| is|something|cool| ;)|
+-------+-------+---------+----+---+
Этот параметр разделителя удивительно важен. Если вы выравниваете свою схему для загрузки таблицы в Redshift, вы не сможете использовать точки в качестве разделителя.
Вот полный фрагмент кода для генерации этого вывода.
val data = Seq(
Row(Row("this", "is"), "something", "cool", ";)")
)
val schema = StructType(
Seq(
StructField(
"foo",
StructType(
Seq(
StructField("bar", StringType, true),
StructField("baz", StringType, true)
)
),
true
),
StructField("x", StringType, true),
StructField("y", StringType, true),
StructField("z", StringType, true)
)
)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(data),
StructType(schema)
)
df.flattenSchema().show()
Базовый код аналогичен коду Дэвида Гриффина (в случае, если вы не хотите добавлять зависимость spark-daria в ваш проект).
object StructTypeHelpers {
def flattenSchema(schema: StructType, delimiter: String = ".", prefix: String = null): Array[Column] = {
schema.fields.flatMap(structField => {
val codeColName = if (prefix == null) structField.name else prefix + "." + structField.name
val colName = if (prefix == null) structField.name else prefix + delimiter + structField.name
structField.dataType match {
case st: StructType => flattenSchema(schema = st, delimiter = delimiter, prefix = colName)
case _ => Array(col(codeColName).alias(colName))
}
})
}
}
object DataFrameExt {
implicit class DataFrameMethods(df: DataFrame) {
def flattenSchema(delimiter: String = ".", prefix: String = null): DataFrame = {
df.select(
StructTypeHelpers.flattenSchema(df.schema, delimiter, prefix): _*
)
}
}
}
Ответ 7
Чтобы объединить ответы Дэвида Гриффена и В. Саммы, вы можете просто сделать это, чтобы сгладить, избегая дублирования имен столбцов:
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrame
def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix + "." + f.name)
f.dataType match {
case st: StructType => flattenSchema(st, colName)
case _ => Array(col(colName).as(colName.replace(".","_")))
}
})
}
def flattenDataFrame(df:DataFrame): DataFrame = {
df.select(flattenSchema(df.schema):_*)
}
var my_flattened_json_table = flattenDataFrame(my_json_table)
Ответ 8
========== edit ====
Здесь есть дополнительная обработка для более сложных схем: https://medium.com/@lvhuyen/working-with-spark-dataframe-having-a-complex-schema-a3bce8c3f44
==================
PySpark, добавленный к ответу @Evan V, когда имена ваших полей содержат специальные символы, такие как точка '.', Дефис '-',...:
from pyspark.sql.types import StructType, ArrayType
def normalise_field(raw):
return raw.strip().lower() \
.replace(''', '') \
.replace('-', '_') \
.replace(' ', '_') \
.strip('_')
def flatten(schema, prefix=None):
fields = []
for field in schema.fields:
name = "%s.'%s'" % (prefix, field.name) if prefix else "'%s'" % field.name
dtype = field.dataType
if isinstance(dtype, ArrayType):
dtype = dtype.elementType
if isinstance(dtype, StructType):
fields += flatten(dtype, prefix=name)
else:
fields.append(col(name).alias(normalise_field(name)))
return fields
df.select(flatten(df.schema)).show()
Ответ 9
Я использовал один лайнер, который приводит к сплющенной схеме с 5 столбцами бара, baz, x, y, z:
df.select("foo.*", "x", "y", "z")
Что касается explode
: я обычно резервирую explode
для выравнивания списка. Например, если у вас есть столбец idList
, который является списком строк, вы можете сделать:
df.withColumn("flattenedId", functions.explode(col("idList")))
.drop("idList")
Это приведет к созданию нового Dataframe с столбцом с именем flattenedId
(больше не список)
Ответ 10
Это модификация решения, но она использует обозначение tailrec.
@tailrec
def flattenSchema(
splitter: String,
fields: List[(StructField, String)],
acc: Seq[Column]): Seq[Column] = {
fields match {
case (field, prefix) :: tail if field.dataType.isInstanceOf[StructType] =>
val newPrefix = s"$prefix${field.name}."
val newFields = field.dataType.asInstanceOf[StructType].fields.map((_, newPrefix)).toList
flattenSchema(splitter, tail ++ newFields, acc)
case (field, prefix) :: tail =>
val colName = s"$prefix${field.name}"
val newCol = col(colName).as(colName.replace(".", splitter))
flattenSchema(splitter, tail, acc :+ newCol)
case _ => acc
}
}
def flattenDataFrame(df: DataFrame): DataFrame = {
val fields = df.schema.fields.map((_, ""))
df.select(flattenSchema("__", fields.toList, Seq.empty): _*)
}
Ответ 11
Небольшое дополнение к приведенному выше коду, если вы работаете с Nested Struct и Array.
def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix + "." + f.name)
f match {
case StructField(_, struct:StructType, _, _) => flattenSchema(struct, colName)
case StructField(_, ArrayType(x :StructType, _), _, _) => flattenSchema(x, colName)
case StructField(_, ArrayType(_, _), _, _) => Array(col(colName))
case _ => Array(col(colName))
}
})
}