Добавить сумму столбца в виде нового столбца в фреймворке PySpark
Я использую PySpark, и у меня есть фреймворк Spark с кучей числовых столбцов. Я хочу добавить столбец, который является суммой всех остальных столбцов.
Предположим, что в моем кадре данных есть столбцы "a", "b" и "c". Я знаю, что могу это сделать:
df.withColumn('total_col', df.a + df.b + df.c)
Проблема в том, что я не хочу набирать каждый столбец отдельно и добавлять их, особенно если у меня много столбцов. Я хочу иметь возможность сделать это автоматически или указав список имен столбцов, которые я хочу добавить. Есть ли другой способ сделать это?
Ответы
Ответ 1
Это не было очевидно. Я не вижу суммы столбцов, определенных в искровом Dataframes API.
Версия 2
Это можно сделать довольно простым способом:
newdf = df.withColumn('total', sum(df[col] for col in df.columns))
df.columns
предоставляется pyspark как список строк, дающих все имена столбцов в фреймворке Spark. Для другой суммы вы можете указать любой другой список имен столбцов.
Я не пробовал это в качестве своего первого решения, потому что не был уверен, как он будет себя вести. Но он работает.
Версия 1
Это слишком сложно, но работает также.
Вы можете сделать это:
- используйте
df.columns
, чтобы получить список имен столбцов
- используйте этот список имен, чтобы составить список столбцов
- передать этот список тому, что вызовет функцию перегруженного столбца в функциональном стиле fold-type
С python reduce, некоторые знания о том, как работает перегрузка операторов, и код pyspark для столбцов здесь, который становится:
def column_add(a,b):
return a.__add__(b)
newdf = df.withColumn('total_col',
reduce(column_add, ( df[col] for col in df.columns ) ))
Обратите внимание, что это сокращение python, а не сокращение RDD искры, а термин скобки во втором параметре для уменьшения требует скобки, потому что это выражение генератора списков.
Протестировано, работает!
$ pyspark
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache()
>>> df
DataFrame[a: bigint, b: bigint, c: bigint]
>>> df.columns
['a', 'b', 'c']
>>> def column_add(a,b):
... return a.__add__(b)
...
>>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect()
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]
Ответ 2
Моя проблема была похожа на описанную выше (немного сложнее), так как мне приходилось добавлять последовательные суммы столбцов в качестве новых столбцов в кадре данных PySpark. Этот подход использует код из Пола версии 1 выше:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('addColAsCumulativeSUM').getOrCreate()
df=spark.createDataFrame(data=[(1,2,3),(4,5,6),(3,2,1)\
,(6,1,-4),(0,2,-2),(6,4,1)\
,(4,5,2),(5,-3,-5),(6,4,-1)]\
,schema=['x1','x2','x3'])
df.show()
+---+---+---+
| x1| x2| x3|
+---+---+---+
| 1| 2| 3|
| 4| 5| 6|
| 3| 2| 1|
| 6| 1| -4|
| 0| 2| -2|
| 6| 4| 1|
| 4| 5| 2|
| 5| -3| -5|
| 6| 4| -1|
+---+---+---+
colnames=df.columns
добавить новые столбцы, которые являются накопительными суммами (подряд):
for i in range(0,len(colnames)):
colnameLst= colnames[0:i+1]
colname = 'cm'+ str(i+1)
df = df.withColumn(colname, sum(df[col] for col in colnameLst))
df.show()
+---+---+---+---+---+---+
| x1| x2| x3|cm1|cm2|cm3|
+---+---+---+---+---+---+
| 1| 2| 3| 1| 3| 6|
| 4| 5| 6| 4| 9| 15|
| 3| 2| 1| 3| 5| 6|
| 6| 1| -4| 6| 7| 3|
| 0| 2| -2| 0| 2| 0|
| 6| 4| 1| 6| 10| 11|
| 4| 5| 2| 4| 9| 11|
| 5| -3| -5| 5| 2| -3|
| 6| 4| -1| 6| 10| 9|
+---+---+---+---+---+---+
Добавлены следующие столбцы "накопленная сумма":
cm1 = x1
cm2 = x1 + x2
cm3 = x1 + x2 + x3
Ответ 3
Решение
newdf = df.withColumn('total', sum(df[col] for col in df.columns))
опубликовано @Paul работ. Тем не менее, я получил ошибку, как и многие другие, как я видел,
TypeError: 'Column' object is not callable
Через некоторое время я обнаружил проблему (по крайней мере, в моем случае). Проблема в том, что я ранее импортировал некоторые функции pyspark со строкой
from pyspark.sql.functions import udf, col, count, sum, when, avg, mean, min
поэтому строка импортировала команду sum
pyspark, тогда как df.withColumn('total', sum(df[col] for col in df.columns))
должен использовать обычную функцию sum
Python.
Вы можете удалить ссылку на функцию pyspark с помощью del sum
.
В противном случае в моем случае я изменил импорт на
import pyspark.sql.functions as F
и затем ссылался на функции как F.sum
.