Ответ 1
Хотя я по-прежнему предпочитаю синтаксис dplyr, этот фрагмент кода будет делать:
import pyspark.sql.functions as sf
df.groupBy("group")\
.agg(sf.sum('money').alias('money'))\
.show(100)
Он становится многословным.
Я анализирую некоторые данные с фреймами данных pyspark, предположим, что у меня есть dataframe df
, который я агрегирую:
df.groupBy("group")\
.agg({"money":"sum"})\
.show(100)
Это даст мне:
group SUM(money#2L)
A 137461285853
B 172185566943
C 271179590646
Агрегирование работает отлично, но мне не нравится новое имя столбца "СУММ (деньги # 2L)". Есть ли простой способ переименовать этот столбец в нечто, читаемое человеком из метода .agg
? Возможно, что-то более похожее на то, что можно было бы сделать в dplyr
:
df %>% group_by(group) %>% summarise(sum_money = sum(money))
Хотя я по-прежнему предпочитаю синтаксис dplyr, этот фрагмент кода будет делать:
import pyspark.sql.functions as sf
df.groupBy("group")\
.agg(sf.sum('money').alias('money'))\
.show(100)
Он становится многословным.
withColumnRenamed
должен сделать трюк. Вот ссылка на pyspark.sql API.
df.groupBy("group")\
.agg({"money":"sum"})\
.withColumnRenamed("SUM(money)", "money")
.show(100)
Я сделал для этого небольшую вспомогательную функцию, которая может помочь некоторым людям.
import re
from functools import partial
def rename_cols(agg_df, ignore_first_n=1):
"""changes the default spark aggregate names 'avg(colname)'
to something a bit more useful. Pass an aggregated dataframe
and the number of aggregation columns to ignore.
"""
delimiters = "(", ")"
split_pattern = '|'.join(map(re.escape, delimiters))
splitter = partial(re.split, split_pattern)
split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
renamed = map(split_agg, agg_df.columns[ignore_first_n:])
renamed = zip(agg_df.columns[ignore_first_n:], renamed)
for old, new in renamed:
agg_df = agg_df.withColumnRenamed(old, new)
return agg_df
Пример:
gb = (df.selectExpr("id", "rank", "rate", "price", "clicks")
.groupby("id")
.agg({"rank": "mean",
"*": "count",
"rate": "mean",
"price": "mean",
"clicks": "mean",
})
)
>>> gb.columns
['id',
'avg(rate)',
'count(1)',
'avg(price)',
'avg(rank)',
'avg(clicks)']
>>> rename_cols(gb).columns
['id',
'avg_rate',
'count_1',
'avg_price',
'avg_rank',
'avg_clicks']
Делать хоть немного, чтобы уберечь людей от печатания так много.
df = df.groupby('Device_ID').agg(aggregate_methods)
for column in df.columns:
start_index = column.find('(')
end_index = column.find(')')
if (start_index and end_index):
df = df.withColumnRenamed(column, column[start_index+1:end_index])
Приведенный выше код может удалить все, что находится за пределами "()". Например, "sum (foo)" будет переименован в "foo".
Это просто как:
val maxVideoLenPerItemDf = requiredItemsFiltered.groupBy("itemId").agg(max("playBackDuration").as("customVideoLength"))
maxVideoLenPerItemDf.show()
Используйте .as
в agg, чтобы назвать новую созданную строку.
Учитывая, что у вас есть словарь columns_and_operations
и, после агрегирования, вы хотите сделать переименование без columns_and_operations
, более простым способом будет:
from functools import reduce
columns_and_operations = {
"rank": "mean",
"*": "count",
"rate": "mean",
"price": "mean",
"clicks": "mean"}
df = df.groupBy("group").agg(columns_and_operations)
old_names = ["{}({})".format(v, k) for k, v in columns_and_operations.items()]
new_names = list(columns_and_operations.keys())
df = reduce(lambda df, i: df.withColumnRenamed(old_names[i],
new_names[i]),
range(len(old_names)),
df)
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName('test').getOrCreate()
data = [(1, "siva", 100), (2, "siva2", 200),(3, "siva3", 300),(4, "siva4", 400),(5, "siva5", 500)]
schema = ['id', 'name', 'sallary']
df = spark.createDataFrame(data, schema=schema)
df.show()
+---+-----+-------+
| id| name|sallary|
+---+-----+-------+
| 1| siva| 100|
| 2|siva2| 200|
| 3|siva3| 300|
| 4|siva4| 400|
| 5|siva5| 500|
+---+-----+-------+
**df.agg({"sallary": "max"}).withColumnRenamed('max(sallary)', 'max').show()**
+---+
|max|
+---+
|500|
+---+