Как добавить новый столбец в Spark DataFrame (используя PySpark)?
У меня есть Spark DataFrame (с использованием PySpark 1.5.1) и хотел бы добавить новый столбец.
Я пробовал следующее без каких-либо успехов:
type(randomed_hours) # => list
# Create in Python and transform to RDD
new_col = pd.DataFrame(randomed_hours, columns=['new_col'])
spark_new_col = sqlContext.createDataFrame(new_col)
my_df_spark.withColumn("hours", spark_new_col["new_col"])
Также получена ошибка:
my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))
Итак, как мне добавить новый столбец (на основе вектора Python) в существующий DataFrame с PySpark?
Ответы
Ответ 1
Вы не можете добавить произвольный столбец в DataFrame
в Spark. Новые столбцы могут быть созданы только с использованием литералов (другие типы литералов описаны в Как добавить столбец констант в Spark DataFrame?)
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
преобразование существующего столбца:
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
включен с использованием join
:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
или сгенерирован с функцией /udf:
from pyspark.sql.functions import rand
df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
Функциональные встроенные функции (pyspark.sql.functions
), которые соответствуют выражению Catalyst, обычно предпочтительнее, чем функции, определенные пользователем Python.
Если вы хотите добавить содержимое произвольного RDD в качестве столбца, вы можете
Ответ 2
Чтобы добавить столбец с помощью UDF:
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def valueToCategory(value):
if value == 1: return 'cat1'
elif value == 2: return 'cat2'
...
else: return 'n/a'
# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()
## +---+---+-----+---------+
## | x1| x2| x3| category|
## +---+---+-----+---------+
## | 1| a| 23.0| cat1|
## | 3| B|-23.0| n/a|
## +---+---+-----+---------+
Ответ 3
Для Spark 2.0
# assumes schema has 'age' column
df.select('*', (df.age + 10).alias('agePlusTen'))
Ответ 4
Я хотел бы предложить обобщенный пример для очень похожего варианта использования:
Вариант использования: у меня есть CSV, состоящий из:
First|Third|Fifth
data|data|data
data|data|data
...billion more lines
Мне нужно выполнить некоторые преобразования, и окончательный CSV должен выглядеть следующим образом
First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines
Я должен сделать это, потому что это схема, определенная некоторой моделью, и мне нужно, чтобы мои окончательные данные были совместимы с SQL Bulk Inserts и такими вещами.
так:
1) Я читаю оригинальный csv с помощью spark.read и называю его "df".
2) Я что-то делаю с данными.
3) Я добавляю нулевые столбцы, используя этот скрипт:
outcols = []
for column in MY_COLUMN_LIST:
if column in df.columns:
outcols.append(column)
else:
outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))
df = df.select(outcols)
Таким образом, вы можете структурировать свою схему после загрузки CSV (также будет работать для переупорядочения столбцов, если вы должны сделать это для многих таблиц).
Ответ 5
Самый простой способ добавить столбец - использовать withColumn. Поскольку фрейм данных создается с использованием sqlContext, необходимо указать схему, или по умолчанию она может быть доступна в наборе данных. Если указана схема, при каждом изменении рабочая нагрузка становится утомительной.
Ниже приведен пример, который вы можете рассмотреть:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc) # SparkContext will be sc by default
# Read the dataset of your choice (Already loaded with schema)
Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter")
# For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following:
Data = Data.withColumn("col31", "Code goes here")
# Check the change
Data.printSchema()
Ответ 6
Вы можете определить новый udf
при добавлении column_name
:
u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')
Ответ 7
from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
lambda val: val, # do sth to val
StringType()
)
df.withColumn('new_col', func_name(df.old_col))