Как создать собственный кодировщик в наборах Spark 2.X?

Набор данных Spark отходит от строки до Encoder для примитивов Pojo. Механизм Catalyst использует ExpressionEncoder для преобразования столбцов в выражение SQL. Однако не существуют другие подклассы Encoder, доступные для использования в качестве шаблона для наших собственных реализаций.

Вот пример кода, который счастлив в Spark 1.X/DataFrames, который не компилируется в новом режиме:

//mapping each row to RDD tuple
df.map(row => {
    var id: String = if (!has_id) "" else row.getAs[String]("id")
    var label: String = row.getAs[String]("label")
    val channels  : Int = if (!has_channels) 0 else row.getAs[Int]("channels")
    val height  : Int = if (!has_height) 0 else row.getAs[Int]("height")
    val width : Int = if (!has_width) 0 else row.getAs[Int]("width")
    val data : Array[Byte] = row.getAs[Any]("data") match {
      case str: String => str.getBytes
      case arr: Array[[email protected]] => arr
      case _ => {
        log.error("Unsupport value type")
        null
      }
    }
    (id, label, channels, height, width, data)
  }).persist(StorageLevel.DISK_ONLY)

}

Мы получаем ошибку компилятора

Error:(56, 11) Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are supported 
by importing spark.implicits._  Support for serializing other types will be added in future releases.
    df.map(row => {
          ^

Итак, почему-то/где-то должно быть средство

  • Определить/реализовать наш пользовательский Encoder
  • Примените его при выполнении сопоставления на DataFrame (теперь это набор данных типа Row)
  • Зарегистрируйте кодировщик для использования другим пользовательским кодом.

Я ищу код, который успешно выполняет эти шаги.

Ответы

Ответ 1

Насколько мне известно, ничего не изменилось с тех пор, как версии 1.6 и решения, описанные в Как сохранить пользовательские объекты в Dataset?, являются единственными доступными параметрами. Тем не менее, ваш текущий код должен отлично работать с кодами по умолчанию для типов продуктов.

Чтобы понять, почему ваш код работал в 1.x и может не работать в 2.0.0, вам нужно будет проверить подписи. В 1.x DataFrame.map - метод, который принимает функцию Row => T и преобразует RDD[Row] в RDD[T].

В 2.0.0 DataFrame.map также используется функция типа Row => T, но преобразуется Dataset[Row] (a.k.a DataFrame) в Dataset[T], поэтому T требует Encoder. Если вы хотите получить "старое" поведение, вы должны явно использовать RDD:

df.rdd.map(row => ???)