Создайте собственный трансформатор в PySpark ML

Я новичок в Spark SQL DataFrames и ML на них (PySpark). Как я могу создать токенизатор костюма, который, например, удаляет стоп-слова и использует некоторые библиотеки из ? Могу ли я увеличить значение по умолчанию?

Спасибо.

Ответы

Ответ 1

Могу ли я продлить один по умолчанию?

На самом деле, нет. По умолчанию Tokenizer является подклассом pyspark.ml.wrapper.JavaTransformer и, как и другие трансфромеры и оценщики из pyspark.ml.feature, делегирует фактическую обработку своему аналогу Scala. Поскольку вы хотите использовать Python, вы должны напрямую расширить pyspark.ml.pipeline.Transformer.

import nltk

from pyspark import keyword_only  ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

class NLTKWordPunctTokenizer(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, stopwords=None):
        super(NLTKWordPunctTokenizer, self).__init__()
        self.stopwords = Param(self, "stopwords", "")
        self._setDefault(stopwords=set())
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, stopwords=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setStopwords(self, value):
        self._paramMap[self.stopwords] = value
        return self

    def getStopwords(self):
        return self.getOrDefault(self.stopwords)

    def _transform(self, dataset):
        stopwords = self.getStopwords()

        def f(s):
            tokens = nltk.tokenize.wordpunct_tokenize(s)
            return [t for t in tokens if t.lower() not in stopwords]

        t = ArrayType(StringType())
        out_col = self.getOutputCol()
        in_col = dataset[self.getInputCol()]
        return dataset.withColumn(out_col, udf(f, t)(in_col))

Пример использования (данные из ML - Особенности):

sentenceDataFrame = spark.createDataFrame([
  (0, "Hi I heard about Spark"),
  (0, "I wish Java could use case classes"),
  (1, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = NLTKWordPunctTokenizer(
    inputCol="sentence", outputCol="words",  
    stopwords=set(nltk.corpus.stopwords.words('english')))

tokenizer.transform(sentenceDataFrame).show()

Для пользовательского Python Estimator см. Как бросить пользовательский оценщик в PySpark mllib

Answer Этот ответ зависит от внутреннего API и совместим с Spark 2.0.3, 2.1.1, 2.2.0 или более поздней версией (SPARK-19348). Код, совместимый с предыдущими версиями Spark, см. В редакции 8.

Ответ 2

@dmbaker ваша реализация не работает для Python 3+.