Pyspark: разделение столбцов нескольких массивов на строки
У меня есть dataframe, который имеет одну строку и несколько столбцов. Некоторые из столбцов являются одиночными значениями, а другие - списками. Все столбцы списка имеют одинаковую длину. Я хочу разбить каждый столбец списка на отдельную строку, сохраняя любой столбец без списка, как есть.
Пример DF:
df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# | a| b| c| d|
# +---+---------+---------+---+
# | 1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+
Что я хочу:
+---+---+----+------+
| a| b| c | d |
+---+---+----+------+
| 1| 1| 7 | foo |
| 1| 2| 8 | foo |
| 1| 3| 9 | foo |
+---+---+----+------+
Если бы у меня был только один столбец списка, это было бы легко, просто сделав explode
:
df_exploded = df.withColumn('b', explode('b'))
# >>> df_exploded.show()
# +---+---+---------+---+
# | a| b| c| d|
# +---+---+---------+---+
# | 1| 1|[7, 8, 9]|foo|
# | 1| 2|[7, 8, 9]|foo|
# | 1| 3|[7, 8, 9]|foo|
# +---+---+---------+---+
Однако, если я попытаюсь также explode
столбец c
, я получаю данные с длиной, квадратом которой я хочу:
df_exploded_again = df_exploded.withColumn('c', explode('c'))
# >>> df_exploded_again.show()
# +---+---+---+---+
# | a| b| c| d|
# +---+---+---+---+
# | 1| 1| 7|foo|
# | 1| 1| 8|foo|
# | 1| 1| 9|foo|
# | 1| 2| 7|foo|
# | 1| 2| 8|foo|
# | 1| 2| 9|foo|
# | 1| 3| 7|foo|
# | 1| 3| 8|foo|
# | 1| 3| 9|foo|
# +---+---+---+---+
Что я хочу - для каждого столбца возьмите n-й элемент массива в этом столбце и добавьте его в новую строку. Я пробовал сопоставлять разнесение по всем столбцам в фреймворке данных, но это тоже не работает:
df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()
Ответы
Ответ 1
С DataFrames
и UDF:
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, udf
zip_ = udf(
lambda x, y: list(zip(x, y)),
ArrayType(StructType([
# Adjust types to reflect data types
StructField("first", IntegerType()),
StructField("second", IntegerType())
]))
)
(df
.withColumn("tmp", zip_("b", "c"))
# UDF output cannot be directly passed to explode
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))
С RDDs
:
(df
.rdd
.flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
.toDF(["a", "b", "c", "d"]))
Оба решения неэффективны из-за накладных расходов на Python. Если размер данных исправлен, вы можете сделать что-то вроде этого:
from functools import reduce
from pyspark.sql import DataFrame
# Length of array
n = 3
# For legacy Python you'll need a separate function
# in place of method accessor
reduce(
DataFrame.unionAll,
(df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
for i in range(n))
).toDF("a", "b", "c", "d")
или даже:
from pyspark.sql.functions import array, struct
# SQL level zip of arrays of known size
# followed by explode
tmp = explode(array(*[
struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
for i in range(n)
]))
(df
.withColumn("tmp", tmp)
.select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))
Это должно быть значительно быстрее по сравнению с UDF или RDD. Обобщены для поддержки произвольного количества столбцов:
# This uses keyword only arguments
# If you use legacy Python you'll have to change signature
# Body of the function can stay the same
def zip_and_explode(*colnames, n):
return explode(array(*[
struct(*[col(c).getItem(i).alias(c) for c in colnames])
for i in range(n)
]))
df.withColumn("tmp", zip_and_explode("b", "c", n=3))
Ответ 2
Вам нужно использовать flatMap
, а не map
, поскольку вы хотите сделать несколько выходных строк из каждой строки ввода.
from pyspark.sql import Row
def dualExplode(r):
rowDict = r.asDict()
bList = rowDict.pop('b')
cList = rowDict.pop('c')
for b,c in zip(bList, cList):
newDict = dict(rowDict)
newDict['b'] = b
newDict['c'] = c
yield Row(**newDict)
df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))