Кодировать и собирать несколько функций в PySpark

У меня есть класс Python, который я использую для загрузки и обработки некоторых данных в Spark. Среди различных вещей, которые мне нужно сделать, я создаю список фиктивных переменных, полученных из разных столбцов в фреймворке Spark. Моя проблема заключается в том, что я не уверен, как правильно определить пользовательскую функцию, чтобы выполнить то, что мне нужно.

В настоящее время у меня есть метод, который при сопоставлении с базовым фреймворком данных RDD решает половину проблемы (помните, что это метод в более крупном классе data_processor):

def build_feature_arr(self,table):
    # this dict has keys for all the columns for which I need dummy coding
    categories = {'gender':['1','2'], ..}

    # there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file
    if table == 'users':
        iter_over = self.config.dyadic_features_to_include
    elif table == 'activty':
        iter_over = self.config.user_features_to_include

    def _build_feature_arr(row):
        result = []
        row = row.asDict()
        for col in iter_over:
            column_value = str(row[col]).lower()
            cats = categories[col]
            result += [1 if column_value and cat==column_value else 0 for cat in cats]
        return result
    return _build_feature_arr

По существу, это означает, что для указанного фреймворка принимает значения категориальной переменной для указанных столбцов и возвращает список значений этих новых фиктивных переменных. Это означает следующий код:

data = data_processor(init_args)
result = data.user_data.rdd.map(self.build_feature_arr('users'))

возвращает что-то вроде:

In [39]: result.take(10)
Out[39]:
[[1, 0, 0, 0, 1, 0],
 [1, 0, 0, 1, 0, 0],
 [1, 0, 0, 0, 0, 0],
 [1, 0, 1, 0, 0, 0],
 [1, 0, 0, 1, 0, 0],
 [1, 0, 0, 1, 0, 0],
 [0, 1, 1, 0, 0, 0],
 [1, 0, 1, 1, 0, 0],
 [1, 0, 0, 1, 0, 0],
 [1, 0, 0, 0, 0, 1]]

Это именно то, что я хочу с точки зрения генерации списка фиктивных переменных, которые я хочу, но здесь мой вопрос: как я могу (а) создать UDF с аналогичной функциональностью, которую я могу использовать в запросе Spark SQL (или каким-либо другим способом, я полагаю), или (б) взять RDD, полученную в результате описанной выше карты, и добавить ее в качестве нового столбца в dataframe user_data?

В любом случае, мне нужно создать новый фреймворк данных, содержащий столбцы из user_data, а также новый столбец (пусть его называют feature_array), содержащий вывод функции выше (или что-то функционально эквивалентное).

Ответы

Ответ 1

Spark> = 2.3,> = 3.0

Начиная с OneHotEncoder Spark 2.3 OneHotEncoder устарела в пользу OneHotEncoderEstimator. Если вы используете последнюю версию, пожалуйста, измените encoder

from pyspark.ml.feature import OneHotEncoderEstimator

encoder = OneHotEncoderEstimator(
    inputCols=["gender_numeric"],  
    outputCols=["gender_vector"]
)

В Spark 3.0 этот вариант был переименован в OneHotEncoder:

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(
    inputCols=["gender_numeric"],  
    outputCols=["gender_vector"]
)

Дополнительно StringIndexer был расширен для поддержки нескольких входных столбцов:

StringIndexer(inputCols=["gender"], outputCols=["gender_numeric"])

Искра <2.3

Ну, вы можете написать UDF, но почему бы вам? Уже есть немало инструментов, предназначенных для решения этой категории задач:

from pyspark.sql import Row
from pyspark.ml.linalg import DenseVector

row = Row("gender", "foo", "bar")

df = sc.parallelize([
  row("0", 3.0, DenseVector([0, 2.1, 1.0])),
  row("1", 1.0, DenseVector([0, 1.1, 1.0])),
  row("1", -1.0, DenseVector([0, 3.4, 0.0])),
  row("0", -3.0, DenseVector([0, 4.1, 0.0]))
]).toDF()

Прежде всего StringIndexer.

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df)
indexed_df = indexer.transform(df)
indexed_df.drop("bar").show()

## +------+----+--------------+
## |gender| foo|gender_numeric|
## +------+----+--------------+
## |     0| 3.0|           0.0|
## |     1| 1.0|           1.0|
## |     1|-1.0|           1.0|
## |     0|-3.0|           0.0|
## +------+----+--------------+

Следующий OneHotEncoder:

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCol="gender_numeric", outputCol="gender_vector")
encoded_df = encoder.transform(indexed_df)
encoded_df.drop("bar").show()

## +------+----+--------------+-------------+
## |gender| foo|gender_numeric|gender_vector|
## +------+----+--------------+-------------+
## |     0| 3.0|           0.0|(1,[0],[1.0])|
## |     1| 1.0|           1.0|    (1,[],[])|
## |     1|-1.0|           1.0|    (1,[],[])|
## |     0|-3.0|           0.0|(1,[0],[1.0])|
## +------+----+--------------+-------------+

VectorAssembler:

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["gender_vector", "bar", "foo"], outputCol="features")

encoded_df_with_indexed_bar = (vector_indexer
    .fit(encoded_df)
    .transform(encoded_df))

final_df = assembler.transform(encoded_df)

Если bar содержит категориальные переменные, вы можете использовать VectorIndexer для установки необходимых метаданных:

from pyspark.ml.feature import VectorIndexer

vector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed")

но это не тот случай, здесь.

Наконец, вы можете обернуть все это с помощью конвейеров:

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vector_indexer, assembler])
model = pipeline.fit(df)
transformed = model.transform(df)

Возможно, это гораздо более надежный и чистый подход, чем писать все с нуля. Существуют некоторые предостережения, особенно когда вам необходимо согласованное кодирование между различными наборами данных. Вы можете прочитать больше в официальной документации для StringIndexer и VectorIndexer.

Другой способ получить сопоставимый результат - это RFormula которая:

RFormula создает векторный столбец объектов и двойной или строковый столбец метки. Как и при использовании формул в R для линейной регрессии, строковые входные столбцы будут кодироваться в горячем виде, а числовые столбцы приводятся к двойным значениям. Если столбец метки имеет тип string, он сначала преобразуется в удвоенный с помощью StringIndexer. Если столбец метки не существует в DataFrame, столбец выходной метки будет создан из указанной переменной ответа в формуле.

from pyspark.ml.feature import RFormula

rf = RFormula(formula="~ gender +  bar + foo - 1")
final_df_rf = rf.fit(df).transform(df)

Как вы можете видеть, он гораздо более лаконичен, но сложнее его сочинять не требует особых настроек. Тем не менее, результат для такого простого конвейера будет таким же:

final_df_rf.select("features").show(4, False)

## +----------------------+
## |features              |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0])  |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+


final_df.select("features").show(4, False)

## +----------------------+
## |features              |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0])  |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+

По поводу ваших вопросов:

создать UDF с похожей функциональностью, которую я могу использовать в запросе Spark SQL (или другим способом, я полагаю)

Это просто UDF, как и любой другой. Убедитесь, что вы используете поддерживаемые типы, кроме того, все должно работать нормально.

взять RDD, полученный из карты, описанной выше, и добавить его в качестве нового столбца в фрейм данных user_data?

from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import StructType, StructField

schema = StructType([StructField("features", VectorUDT(), True)])
row = Row("features")
result.map(lambda x: row(DenseVector(x))).toDF(schema)

Примечание:

Для Spark 1.x замените pyspark.ml.linalg на pyspark.mllib.linalg.