Обновление столбца dataframe в искровом режиме
Взглянув на новую диаграмму api-диаграммы искры, неясно, можно ли изменять столбцы данных.
Как мне изменить значение в строке x
столбца y
в кадре данных?
В pandas
это будет df.ix[x,y] = new_value
Изменить. Объединив сказанное ниже, вы не можете изменять существующий фреймворк данных, поскольку он является неизменным, но вы можете вернуть новый фреймворк с желаемыми изменениями.
Если вы просто хотите заменить значение в столбце на основе условия, например np.where
:
from pyspark.sql import functions as F
update_func = (F.when(F.col('update_col') == replace_val, new_value)
.otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)
Если вы хотите выполнить некоторую операцию над столбцом и создать новый столбец, который добавлен в фреймворк данных:
import pyspark.sql.functions as F
import pyspark.sql.types as T
def my_func(col):
do stuff to column here
return transformed_value
# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())
df = df.withColumn('new_column_name', my_udf('update_col'))
Если вы хотите, чтобы новый столбец имел то же имя, что и старый столбец, вы можете добавить дополнительный шаг:
df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
Ответы
Ответ 1
Пока вы не можете изменить столбец как таковой, вы можете работать с столбцом и возвращать новый DataFrame, отражающий это изменение. Для этого вы должны сначала создать UserDefinedFunction
реализацию применяемой операции, а затем выборочно применить эту функцию только к целевому столбцу. В Python:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])
new_df
теперь имеет ту же схему, что и old_df
(предполагая, что old_df.target_column
имеет тип StringType
), но все значения в столбце target_column
будут new_value
.
Ответ 2
Обычно при обновлении столбца мы хотим сопоставить старое значение с новым значением. Здесь можно сделать это в pyspark без UDF's:
# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
F.when(df[update_col]==old_value,new_value).
otherwise(df[update_col])).
Ответ 3
DataFrames
основаны на RDD. RDD являются неизменяемыми структурами и не позволяют обновлять элементы на месте. Чтобы изменить значения, вам нужно будет создать новый DataFrame, преобразовывая исходный либо с помощью SQL-подобных операций DSL или RDD, таких как map
.
Рекомендуемая слайд-панель: Представление DataFrames в Spark для крупномасштабных научных исследований.
Ответ 4
Так же, как maasg говорит, что вы можете создать новый DataFrame из результата карты, примененной к старым DataFrame. Пример для данного DataFrame df
с двумя строками:
val newDf = sqlContext.createDataFrame(df.map(row =>
Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)
Обратите внимание, что если типы столбцов меняются, вам нужно указать правильную схему вместо df.schema
. Проверьте api org.apache.spark.sql.Row
на доступные методы: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html
[Обновить] Или используя UDF в Scala:
import org.apache.spark.sql.functions._
val toLong = udf[Long, String] (_.toLong)
val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")
и если имя столбца должно оставаться неизменным, вы можете переименовать его:
modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")