Как создать собственный кодировщик в наборах Spark 2.X?
Набор данных Spark отходит от строки до Encoder
для примитивов Pojo. Механизм Catalyst
использует ExpressionEncoder
для преобразования столбцов в выражение SQL. Однако не существуют другие подклассы Encoder
, доступные для использования в качестве шаблона для наших собственных реализаций.
Вот пример кода, который счастлив в Spark 1.X/DataFrames, который не компилируется в новом режиме:
//mapping each row to RDD tuple
df.map(row => {
var id: String = if (!has_id) "" else row.getAs[String]("id")
var label: String = row.getAs[String]("label")
val channels : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
val height : Int = if (!has_height) 0 else row.getAs[Int]("height")
val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
val data : Array[Byte] = row.getAs[Any]("data") match {
case str: String => str.getBytes
case arr: Array[[email protected]] => arr
case _ => {
log.error("Unsupport value type")
null
}
}
(id, label, channels, height, width, data)
}).persist(StorageLevel.DISK_ONLY)
}
Мы получаем ошибку компилятора
Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported
by importing spark.implicits._ Support for serializing other types will be added in future releases.
df.map(row => {
^
Итак, почему-то/где-то должно быть средство
- Определить/реализовать наш пользовательский Encoder
- Примените его при выполнении сопоставления на
DataFrame
(теперь это набор данных типа Row
)
- Зарегистрируйте кодировщик для использования другим пользовательским кодом.
Я ищу код, который успешно выполняет эти шаги.
Ответы
Ответ 1
Насколько мне известно, ничего не изменилось с тех пор, как версии 1.6 и решения, описанные в Как сохранить пользовательские объекты в Dataset?, являются единственными доступными параметрами. Тем не менее, ваш текущий код должен отлично работать с кодами по умолчанию для типов продуктов.
Чтобы понять, почему ваш код работал в 1.x и может не работать в 2.0.0, вам нужно будет проверить подписи. В 1.x DataFrame.map
- метод, который принимает функцию Row => T
и преобразует RDD[Row]
в RDD[T]
.
В 2.0.0 DataFrame.map
также используется функция типа Row => T
, но преобразуется Dataset[Row]
(a.k.a DataFrame
) в Dataset[T]
, поэтому T
требует Encoder
. Если вы хотите получить "старое" поведение, вы должны явно использовать RDD
:
df.rdd.map(row => ???)
Ответ 2
Вы импортировали неявные кодеры?
import spark.implicits._
http://spark.apache.org/docs/2.0.0-preview/api/scala/index.html#org.apache.spark.sql.Encoder