Применение UDF в GroupedData в PySpark (с действующим примером python)
У меня есть этот код python, который выполняется локально в фрейме pandas:
df_result = pd.DataFrame(df
.groupby('A')
.apply(lambda x: myFunction(zip(x.B, x.C), x.name))
Я хотел бы запустить это в PySpark, но у меня проблемы с объектом pyspark.sql.group.GroupedData.
Я пробовал следующее:
sparkDF
.groupby('A')
.agg(myFunction(zip('B', 'C'), 'A'))
который возвращает
KeyError: 'A'
Я предполагаю, что "A" больше не является столбцом, и я не могу найти эквивалент для x.name.
И затем
sparkDF
.groupby('A')
.map(lambda row: Row(myFunction(zip('B', 'C'), 'A')))
.toDF()
но получите следующую ошибку:
AttributeError: 'GroupedData' object has no attribute 'map'
Любые предложения будут действительно оценены!
Ответы
Ответ 1
То, что вы пытаетесь сделать, это написать UDAF (пользовательскую агрегированную функцию), а не UDF (пользовательскую функцию). UDAF - это функции, которые работают с данными, сгруппированными по ключу. В частности, они должны определить, как объединить несколько значений в группе в одном разделе, а затем, как объединить результаты по разделам для ключа. В настоящее время в python нет способа реализовать UDAF, они могут быть реализованы только в Scala.
Но вы можете обойти это в Python. Вы можете использовать набор сбора для сбора сгруппированных значений, а затем использовать обычную пользовательскую функцию, чтобы делать с ними то, что вы хотите. Единственное предостережение в том, что collect_set работает только с примитивными значениями, поэтому вам нужно будет закодировать их в строку.
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, collect_list, concat_ws, udf
def myFunc(data_list):
for val in data_list:
b, c = data.split(',')
# do something
return <whatever>
myUdf = udf(myFunc, StringType())
df.withColumn('data', concat_ws(',', col('B'), col('C'))) \
.groupBy('A').agg(collect_list('data').alias('data'))
.withColumn('data', myUdf('data'))
Используйте collect_set, если вы хотите дедупликации. Кроме того, если у вас есть много значений для некоторых из ваших ключей, это будет медленно, потому что все значения для ключа нужно будет собрать в одном разделе где-то в вашем кластере. Если ваш конечный результат представляет собой значение, которое вы строите, комбинируя значения для каждого ключа каким-либо образом (например, суммируя их), возможно, будет быстрее реализовать его, используя метод RDD aggregateByKey, который позволяет вам создать промежуточное значение для каждого ключа в разделе перед перетасовывать данные вокруг.
РЕДАКТИРОВАТЬ: 21.11.2008
Поскольку этот ответ был написан, pyspark добавил поддержку UDAF с использованием Pandas. Есть несколько приятных улучшений производительности при использовании UDF и UDAF Panda по сравнению с прямыми функциями Python с RDD. Под капотом он векторизует столбцы (объединяет значения из нескольких строк для оптимизации обработки и сжатия). Посмотрите здесь для лучшего объяснения или посмотрите на user6910411 ответ ниже для примера.
Ответ 2
Начиная с Spark 2.3 вы можете использовать pandas_udf
. GROUPED_MAP
принимает Callable[[pandas.DataFrame], pandas.DataFrame]
или, другими словами, функцию, которая отображает из Pandas DataFrame
той же формы, что и вход, в выходной DataFrame
.
Например, если данные выглядят так:
df = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")
)
и вы хотите вычислить среднее значение попарно мин между value1
value2
, вы должны определить выходную схему:
from pyspark.sql.types import *
schema = StructType([
StructField("key", StringType()),
StructField("avg_min", DoubleType())
])
pandas_udf
:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
result = pd.DataFrame(df.groupby(df.key).apply(
lambda x: x.loc[:, ["value1", "value2"]].min(axis=1).mean()
))
result.reset_index(inplace=True, drop=False)
return result
и применить его:
df.groupby("key").apply(g).show()
+---+-------+
|key|avg_min|
+---+-------+
| b| -1.5|
| a| -0.5|
+---+-------+
За исключением определения схемы и декоратора, ваш текущий код Pandas может быть применен как есть.
Начиная с Spark 2.4.0 существует также вариант GROUPED_AGG
, который принимает Callable[[pandas.Series,...], T]
, где T
- примитивный скаляр:
import numpy as np
@pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG)
def f(x, y):
return np.minimum(x, y).mean()
который может использоваться со стандартной конструкцией group_by
/agg
:
df.groupBy("key").agg(f("value1", "value2").alias("avg_min")).show()
+---+-------+
|key|avg_min|
+---+-------+
| b| -1.5|
| a| -0.5|
+---+-------+
Обратите внимание, что ни GROUPED_MAP
ни GROUPPED_AGG
pandas_udf
ведут себя так же, как UserDefinedAggregateFunction
или Aggregator
, и они ближе к groupByKey
или оконным функциям с неограниченным фреймом. Сначала данные перетасовываются, и только после этого применяется UDF.
Для оптимизированного выполнения вы должны реализовать Scala UserDefinedAggregateFunction
и добавить обертку Python.
Ответ 3
Я собираюсь расширить ответ.
Таким образом, вы можете реализовать такую же логику, как pandas.groupby(). Применить в pyspark, используя @pandas_udf, и который является методом векторизации и быстрее, чем простой udf.
from pyspark.sql.functions import pandas_udf,PandasUDFType
df3 = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")
)
from pyspark.sql.types import *
schema = StructType([
StructField("key", StringType()),
StructField("avg_value1", DoubleType()),
StructField("avg_value2", DoubleType()),
StructField("sum_avg", DoubleType()),
StructField("sub_avg", DoubleType())
])
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
gr = df['key'].iloc[0]
x = df.value1.mean()
y = df.value2.mean()
w = df.value1.mean() + df.value2.mean()
z = df.value1.mean() - df.value2.mean()
return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])
df3.groupby("key").apply(g).show()
Вы получите результат ниже:
+---+----------+----------+-------+-------+
|key|avg_value1|avg_value2|sum_avg|sub_avg|
+---+----------+----------+-------+-------+
| b| 6.5| -1.5| 5.0| 8.0|
| a| 0.0| 21.0| 21.0| -21.0|
+---+----------+----------+-------+-------+
Таким образом, вы можете делать больше вычислений между другими полями в сгруппированных данных и добавлять их в dataframe в формате списка.