Ответ 1
Spark> = 2.3,> = 3.0
Начиная с OneHotEncoder
Spark 2.3 OneHotEncoder
устарела в пользу OneHotEncoderEstimator
. Если вы используете последнюю версию, пожалуйста, измените encoder
from pyspark.ml.feature import OneHotEncoderEstimator
encoder = OneHotEncoderEstimator(
inputCols=["gender_numeric"],
outputCols=["gender_vector"]
)
В Spark 3.0 этот вариант был переименован в OneHotEncoder
:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(
inputCols=["gender_numeric"],
outputCols=["gender_vector"]
)
Дополнительно StringIndexer
был расширен для поддержки нескольких входных столбцов:
StringIndexer(inputCols=["gender"], outputCols=["gender_numeric"])
Искра <2.3
Ну, вы можете написать UDF, но почему бы вам? Уже есть немало инструментов, предназначенных для решения этой категории задач:
from pyspark.sql import Row
from pyspark.ml.linalg import DenseVector
row = Row("gender", "foo", "bar")
df = sc.parallelize([
row("0", 3.0, DenseVector([0, 2.1, 1.0])),
row("1", 1.0, DenseVector([0, 1.1, 1.0])),
row("1", -1.0, DenseVector([0, 3.4, 0.0])),
row("0", -3.0, DenseVector([0, 4.1, 0.0]))
]).toDF()
Прежде всего StringIndexer
.
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df)
indexed_df = indexer.transform(df)
indexed_df.drop("bar").show()
## +------+----+--------------+
## |gender| foo|gender_numeric|
## +------+----+--------------+
## | 0| 3.0| 0.0|
## | 1| 1.0| 1.0|
## | 1|-1.0| 1.0|
## | 0|-3.0| 0.0|
## +------+----+--------------+
Следующий OneHotEncoder
:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCol="gender_numeric", outputCol="gender_vector")
encoded_df = encoder.transform(indexed_df)
encoded_df.drop("bar").show()
## +------+----+--------------+-------------+
## |gender| foo|gender_numeric|gender_vector|
## +------+----+--------------+-------------+
## | 0| 3.0| 0.0|(1,[0],[1.0])|
## | 1| 1.0| 1.0| (1,[],[])|
## | 1|-1.0| 1.0| (1,[],[])|
## | 0|-3.0| 0.0|(1,[0],[1.0])|
## +------+----+--------------+-------------+
VectorAssembler
:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=["gender_vector", "bar", "foo"], outputCol="features")
encoded_df_with_indexed_bar = (vector_indexer
.fit(encoded_df)
.transform(encoded_df))
final_df = assembler.transform(encoded_df)
Если bar
содержит категориальные переменные, вы можете использовать VectorIndexer
для установки необходимых метаданных:
from pyspark.ml.feature import VectorIndexer
vector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed")
но это не тот случай, здесь.
Наконец, вы можете обернуть все это с помощью конвейеров:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vector_indexer, assembler])
model = pipeline.fit(df)
transformed = model.transform(df)
Возможно, это гораздо более надежный и чистый подход, чем писать все с нуля. Существуют некоторые предостережения, особенно когда вам необходимо согласованное кодирование между различными наборами данных. Вы можете прочитать больше в официальной документации для StringIndexer
и VectorIndexer
.
Другой способ получить сопоставимый результат - это RFormula
которая:
RFormula
создает векторный столбец объектов и двойной или строковый столбец метки. Как и при использовании формул в R для линейной регрессии, строковые входные столбцы будут кодироваться в горячем виде, а числовые столбцы приводятся к двойным значениям. Если столбец метки имеет тип string, он сначала преобразуется в удвоенный с помощьюStringIndexer
. Если столбец метки не существует в DataFrame, столбец выходной метки будет создан из указанной переменной ответа в формуле.
from pyspark.ml.feature import RFormula
rf = RFormula(formula="~ gender + bar + foo - 1")
final_df_rf = rf.fit(df).transform(df)
Как вы можете видеть, он гораздо более лаконичен, но сложнее его сочинять не требует особых настроек. Тем не менее, результат для такого простого конвейера будет таким же:
final_df_rf.select("features").show(4, False)
## +----------------------+
## |features |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0]) |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+
final_df.select("features").show(4, False)
## +----------------------+
## |features |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0]) |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+
По поводу ваших вопросов:
создать UDF с похожей функциональностью, которую я могу использовать в запросе Spark SQL (или другим способом, я полагаю)
Это просто UDF, как и любой другой. Убедитесь, что вы используете поддерживаемые типы, кроме того, все должно работать нормально.
взять RDD, полученный из карты, описанной выше, и добавить его в качестве нового столбца в фрейм данных user_data?
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import StructType, StructField
schema = StructType([StructField("features", VectorUDT(), True)])
row = Row("features")
result.map(lambda x: row(DenseVector(x))).toDF(schema)
Примечание:
Для Spark 1.x замените pyspark.ml.linalg
на pyspark.mllib.linalg
.