Разбить строку строки данных Spark Dataframe на несколько столбцов

Я видел различных людей, предлагающих, что Dataframe.explode - полезный способ сделать это, но это приводит к большему количеству строк, чем исходный фреймворк данных, чего я совсем не хочу. Я просто хочу сделать эквивалент Dataframe очень простым:

rdd.map(lambda row: row + [row.my_str_col.split('-')])

который принимает что-то похожее:

col1 | my_str_col
-----+-----------
  18 |  856-yygrm
 201 |  777-psgdg

и преобразует его в это:

col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg

Я знаю pyspark.sql.functions.split(), но это приводит к столбцу вложенного массива вместо двух столбцов верхнего уровня, как я хочу.

В идеале я хочу, чтобы эти новые столбцы также были названы.

Ответы

Ответ 1

pyspark.sql.functions.split() - правильный подход здесь - вам просто нужно сгладить вложенный столбец ArrayType на несколько столбцов верхнего уровня. В этом случае, когда каждый массив содержит только 2 элемента, это очень просто. Вы просто используете Column.getItem() для извлечения каждой части массива как самого столбца:

split_col = pyspark.sql.functions.split(df['my_str_col'], '-')
df = df.withColumn('NAME1', split_col.getItem(0))
df = df.withColumn('NAME2', split_col.getItem(1))

Результат будет:

col1 | my_str_col | NAME1 | NAME2
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg

Я не уверен, как бы я решил это в общем случае, когда вложенные массивы не были одинакового размера от строки до строки.

Ответ 2

Здесь решение общего случая, при котором не требуется заранее знать длину массива, используя collect или udf s. К сожалению, это работает только для spark версии 2.1 и выше, поскольку для этого требуется функция posexplode.

Предположим, у вас был следующий DataFrame:

df = spark.createDataFrame(
    [
        [1, 'A, B, C, D'], 
        [2, 'E, F, G'], 
        [3, 'H, I'], 
        [4, 'J']
    ]
    , ["num", "letters"]
)
df.show()
#+---+----------+
#|num|   letters|
#+---+----------+
#|  1|A, B, C, D|
#|  2|   E, F, G|
#|  3|      H, I|
#|  4|         J|
#+---+----------+

Разделите столбец letters, а затем используйте posexplode, чтобы разбить результирующий массив вместе с позицией в массиве. Затем используйте pyspark.sql.functions.expr, чтобы получить элемент с индексом pos в этом массиве.

import pyspark.sql.functions as f

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .show()
#+---+------------+---+---+
#|num|     letters|pos|val|
#+---+------------+---+---+
#|  1|[A, B, C, D]|  0|  A|
#|  1|[A, B, C, D]|  1|  B|
#|  1|[A, B, C, D]|  2|  C|
#|  1|[A, B, C, D]|  3|  D|
#|  2|   [E, F, G]|  0|  E|
#|  2|   [E, F, G]|  1|  F|
#|  2|   [E, F, G]|  2|  G|
#|  3|      [H, I]|  0|  H|
#|  3|      [H, I]|  1|  I|
#|  4|         [J]|  0|  J|
#+---+------------+---+---+

Теперь мы создаем две новые колонки из этого результата. Первый - это имя нашего нового столбца, который будет объединением letter и индексом в массиве. Второй столбец будет значением соответствующего индекса в массиве. Мы получаем последнее, используя функциональность pyspark.sql.functions.expr, которая позволяет нам использовать значения столбцов в качестве параметров.

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .drop("val")\
    .select(
        "num",
        f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
        f.expr("letters[pos]").alias("val")
    )\
    .show()
#+---+-------+---+
#|num|   name|val|
#+---+-------+---+
#|  1|letter0|  A|
#|  1|letter1|  B|
#|  1|letter2|  C|
#|  1|letter3|  D|
#|  2|letter0|  E|
#|  2|letter1|  F|
#|  2|letter2|  G|
#|  3|letter0|  H|
#|  3|letter1|  I|
#|  4|letter0|  J|
#+---+-------+---+

Теперь мы можем просто groupBy num и pivot DataFrame. Собрав все это вместе, мы получим:

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .drop("val")\
    .select(
        "num",
        f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
        f.expr("letters[pos]").alias("val")
    )\
    .groupBy("num").pivot("name").agg(f.first("val"))\
    .show()
#+---+-------+-------+-------+-------+
#|num|letter0|letter1|letter2|letter3|
#+---+-------+-------+-------+-------+
#|  1|      A|      B|      C|      D|
#|  3|      H|      I|   null|   null|
#|  2|      E|      F|      G|   null|
#|  4|      J|   null|   null|   null|
#+---+-------+-------+-------+-------+

Ответ 3

Я нашел решение для общего неравномерного случая (или когда вы получаете вложенные столбцы, полученные с помощью функции .split()):

import pyspark.sql.functions as f

@f.udf(StructType([StructField(col_3, StringType(), True),
                   StructField(col_4, StringType(), True)]))

 def splitCols(array):
    return array[0],  ''.join(array[1:len(array)])

 df = df.withColumn("name", splitCols(f.split(f.col("my_str_col"), '-')))\
        .select(df.columns+['name.*'])

По сути, вам просто нужно выбрать все предыдущие столбцы + вложенные столбцы 'column_name. *', и в этом случае вы получите их как два столбца верхнего уровня.

Ответ 4

Здесь другой подход, если вы хотите разбить строку с разделителем.

import pyspark.sql.functions as f

df = spark.createDataFrame([("1:a:2001",),("2:b:2002",),("3:c:2003",)],["value"])
df.show()
+--------+
|   value|
+--------+
|1:a:2001|
|2:b:2002|
|3:c:2003|
+--------+

df_split = df.select(f.split(df.value,":")).rdd.flatMap(
              lambda x: x).toDF(schema=["col1","col2","col3"])

df_split.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a|2001|
|   2|   b|2002|
|   3|   c|2003|
+----+----+----+

Я не думаю, что этот переход назад и вперед к RDD замедлит вас... Также не беспокойтесь о последней спецификации схемы: она необязательна, ее можно избежать, обобщая решение для данных с неизвестным размером столбца.