Ответ 1
Одним из возможных улучшений является создание собственного Transformer
, который будет обрабатывать нормализацию Unicode и соответствующую оболочку Python. Это должно снизить общую нагрузку при передаче данных между JVM и Python и не требует каких-либо изменений в самой Spark или доступа к частному API.
На стороне JVM вам понадобится трансформатор, похожий на этот:
package net.zero323.spark.ml.feature
import java.text.Normalizer
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
import org.apache.spark.sql.types.{DataType, StringType}
class UnicodeNormalizer (override val uid: String)
extends UnaryTransformer[String, String, UnicodeNormalizer] {
def this() = this(Identifiable.randomUID("unicode_normalizer"))
private val forms = Map(
"NFC" -> Normalizer.Form.NFC, "NFD" -> Normalizer.Form.NFD,
"NFKC" -> Normalizer.Form.NFKC, "NFKD" -> Normalizer.Form.NFKD
)
val form: Param[String] = new Param(this, "form", "unicode form (one of NFC, NFD, NFKC, NFKD)",
ParamValidators.inArray(forms.keys.toArray))
def setN(value: String): this.type = set(form, value)
def getForm: String = $(form)
setDefault(form -> "NFKD")
override protected def createTransformFunc: String => String = {
val normalizerForm = forms($(form))
(s: String) => Normalizer.normalize(s, normalizerForm)
}
override protected def validateInputType(inputType: DataType): Unit = {
require(inputType == StringType, s"Input type must be string type but got $inputType.")
}
override protected def outputDataType: DataType = StringType
}
Соответствующее определение сборки (настройте версии Spark и Scala в соответствии с вашим развертыванием Spark):
name := "unicode-normalization"
version := "1.0"
crossScalaVersions := Seq("2.11.12", "2.12.8")
organization := "net.zero323"
val sparkVersion = "2.4.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion
)
На стороне Python вам понадобится оболочка, похожая на эту.
from pyspark.ml.param.shared import *
# from pyspark.ml.util import keyword_only # in Spark < 2.0
from pyspark import keyword_only
from pyspark.ml.wrapper import JavaTransformer
class UnicodeNormalizer(JavaTransformer, HasInputCol, HasOutputCol):
@keyword_only
def __init__(self, form="NFKD", inputCol=None, outputCol=None):
super(UnicodeNormalizer, self).__init__()
self._java_obj = self._new_java_obj(
"net.zero323.spark.ml.feature.UnicodeNormalizer", self.uid)
self.form = Param(self, "form",
"unicode form (one of NFC, NFD, NFKC, NFKD)")
# kwargs = self.__init__._input_kwargs # in Spark < 2.0
kwargs = self._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, form="NFKD", inputCol=None, outputCol=None):
# kwargs = self.setParams._input_kwargs # in Spark < 2.0
kwargs = self._input_kwargs
return self._set(**kwargs)
def setForm(self, value):
return self._set(form=value)
def getForm(self):
return self.getOrDefault(self.form)
Сборка пакета Scala:
sbt +package
включите его при запуске оболочки или отправке. Например, для сборки Spark с Scala 2.11:
bin/pyspark --jars path-to/target/scala-2.11/unicode-normalization_2.11-1.0.jar \
--driver-class-path path-to/target/scala-2.11/unicode-normalization_2.11-1.0.jar
и ты должен быть готов к работе. Все, что осталось, это немного магии регулярных выражений:
from pyspark.sql.functions import regexp_replace
normalizer = UnicodeNormalizer(form="NFKD",
inputCol="text", outputCol="text_normalized")
df = sc.parallelize([
(1, "Maracaibó"), (2, "New York"),
(3, " São Paulo "), (4, "~Madrid")
]).toDF(["id", "text"])
(normalizer
.transform(df)
.select(regexp_replace("text_normalized", "\p{M}", ""))
.show())
## +--------------------------------------+
## |regexp_replace(text_normalized,\p{M},)|
## +--------------------------------------+
## | Maracaibo|
## | New York|
## | Sao Paulo |
## | ~Madrid|
## +--------------------------------------+
Обратите внимание, что это соответствует тем же соглашениям, что и встроенные преобразователи текста, и не является нулевым. Вы можете легко исправить это путем проверки на null
в createTransformFunc
.