Разбить строку строки данных Spark Dataframe на несколько столбцов
Я видел различных людей, предлагающих, что Dataframe.explode
- полезный способ сделать это, но это приводит к большему количеству строк, чем исходный фреймворк данных, чего я совсем не хочу. Я просто хочу сделать эквивалент Dataframe очень простым:
rdd.map(lambda row: row + [row.my_str_col.split('-')])
который принимает что-то похожее:
col1 | my_str_col
-----+-----------
18 | 856-yygrm
201 | 777-psgdg
и преобразует его в это:
col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
18 | 856-yygrm | 856 | yygrm
201 | 777-psgdg | 777 | psgdg
Я знаю pyspark.sql.functions.split()
, но это приводит к столбцу вложенного массива вместо двух столбцов верхнего уровня, как я хочу.
В идеале я хочу, чтобы эти новые столбцы также были названы.
Ответы
Ответ 1
pyspark.sql.functions.split()
- правильный подход здесь - вам просто нужно сгладить вложенный столбец ArrayType на несколько столбцов верхнего уровня. В этом случае, когда каждый массив содержит только 2 элемента, это очень просто. Вы просто используете Column.getItem()
для извлечения каждой части массива как самого столбца:
split_col = pyspark.sql.functions.split(df['my_str_col'], '-')
df = df.withColumn('NAME1', split_col.getItem(0))
df = df.withColumn('NAME2', split_col.getItem(1))
Результат будет:
col1 | my_str_col | NAME1 | NAME2
-----+------------+-------+------
18 | 856-yygrm | 856 | yygrm
201 | 777-psgdg | 777 | psgdg
Я не уверен, как бы я решил это в общем случае, когда вложенные массивы не были одинакового размера от строки до строки.
Ответ 2
Здесь решение общего случая, при котором не требуется заранее знать длину массива, используя collect
или udf
s. К сожалению, это работает только для spark
версии 2.1 и выше, поскольку для этого требуется функция posexplode
.
Предположим, у вас был следующий DataFrame:
df = spark.createDataFrame(
[
[1, 'A, B, C, D'],
[2, 'E, F, G'],
[3, 'H, I'],
[4, 'J']
]
, ["num", "letters"]
)
df.show()
#+---+----------+
#|num| letters|
#+---+----------+
#| 1|A, B, C, D|
#| 2| E, F, G|
#| 3| H, I|
#| 4| J|
#+---+----------+
Разделите столбец letters
, а затем используйте posexplode
, чтобы разбить результирующий массив вместе с позицией в массиве. Затем используйте pyspark.sql.functions.expr
, чтобы получить элемент с индексом pos
в этом массиве.
import pyspark.sql.functions as f
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
)\
.show()
#+---+------------+---+---+
#|num| letters|pos|val|
#+---+------------+---+---+
#| 1|[A, B, C, D]| 0| A|
#| 1|[A, B, C, D]| 1| B|
#| 1|[A, B, C, D]| 2| C|
#| 1|[A, B, C, D]| 3| D|
#| 2| [E, F, G]| 0| E|
#| 2| [E, F, G]| 1| F|
#| 2| [E, F, G]| 2| G|
#| 3| [H, I]| 0| H|
#| 3| [H, I]| 1| I|
#| 4| [J]| 0| J|
#+---+------------+---+---+
Теперь мы создаем две новые колонки из этого результата. Первый - это имя нашего нового столбца, который будет объединением letter
и индексом в массиве. Второй столбец будет значением соответствующего индекса в массиве. Мы получаем последнее, используя функциональность pyspark.sql.functions.expr
, которая позволяет нам использовать значения столбцов в качестве параметров.
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
)\
.drop("val")\
.select(
"num",
f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
f.expr("letters[pos]").alias("val")
)\
.show()
#+---+-------+---+
#|num| name|val|
#+---+-------+---+
#| 1|letter0| A|
#| 1|letter1| B|
#| 1|letter2| C|
#| 1|letter3| D|
#| 2|letter0| E|
#| 2|letter1| F|
#| 2|letter2| G|
#| 3|letter0| H|
#| 3|letter1| I|
#| 4|letter0| J|
#+---+-------+---+
Теперь мы можем просто groupBy
num
и pivot
DataFrame. Собрав все это вместе, мы получим:
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
)\
.drop("val")\
.select(
"num",
f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
f.expr("letters[pos]").alias("val")
)\
.groupBy("num").pivot("name").agg(f.first("val"))\
.show()
#+---+-------+-------+-------+-------+
#|num|letter0|letter1|letter2|letter3|
#+---+-------+-------+-------+-------+
#| 1| A| B| C| D|
#| 3| H| I| null| null|
#| 2| E| F| G| null|
#| 4| J| null| null| null|
#+---+-------+-------+-------+-------+
Ответ 3
Я нашел решение для общего неравномерного случая (или когда вы получаете вложенные столбцы, полученные с помощью функции .split()):
import pyspark.sql.functions as f
@f.udf(StructType([StructField(col_3, StringType(), True),
StructField(col_4, StringType(), True)]))
def splitCols(array):
return array[0], ''.join(array[1:len(array)])
df = df.withColumn("name", splitCols(f.split(f.col("my_str_col"), '-')))\
.select(df.columns+['name.*'])
По сути, вам просто нужно выбрать все предыдущие столбцы + вложенные столбцы 'column_name. *', и в этом случае вы получите их как два столбца верхнего уровня.
Ответ 4
Здесь другой подход, если вы хотите разбить строку с разделителем.
import pyspark.sql.functions as f
df = spark.createDataFrame([("1:a:2001",),("2:b:2002",),("3:c:2003",)],["value"])
df.show()
+--------+
| value|
+--------+
|1:a:2001|
|2:b:2002|
|3:c:2003|
+--------+
df_split = df.select(f.split(df.value,":")).rdd.flatMap(
lambda x: x).toDF(schema=["col1","col2","col3"])
df_split.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
| 1| a|2001|
| 2| b|2002|
| 3| c|2003|
+----+----+----+
Я не думаю, что этот переход назад и вперед к RDD замедлит вас...
Также не беспокойтесь о последней спецификации схемы: она необязательна, ее можно избежать, обобщая решение для данных с неизвестным размером столбца.