Spark: Запись в файл Avro

Я в Spark, у меня есть RDD из файла Avro. Теперь я хочу сделать некоторые преобразования на этом RDD и сохранить его как файл Avro:

val job = new Job(new Configuration())
AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema))

rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2))
   .saveAsNewAPIHadoopFile(outputPath, 
  classOf[AvroKey[GenericRecord]], 
  classOf[org.apache.hadoop.io.NullWritable], 
  classOf[AvroKeyOutputFormat[GenericRecord]], 
  job.getConfiguration)

При запуске этого Spark жалуется, что Schema $recordSchema не является сериализуемым.

Если я раскомментирую вызов .map(и просто имею rdd.saveAsNewAPIHadoopFile), вызов будет успешным.

Что я здесь делаю неправильно?

Любая идея?

Ответы

Ответ 1

Проблема здесь связана с несериализуемостью класса avro.Schema, используемого в Job. Исключение возникает при попытке ссылаться на объект схемы из кода внутри функции карты.

Например, если вы попытаетесь сделать следующее, вы получите исключение "Задача не сериализуемая":

val schema = new Schema.Parser().parse(new File(jsonSchema))
...
rdd.map(t => {
  // reference to the schema object declared outside
  val record = new GenericData.Record(schema)
})

Вы можете заставить все работать, просто создав новый экземпляр схемы внутри функционального блока:

val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it for other purposes
...
rdd.map(t => {
  // create a new Schema object
  val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
  val record = new GenericData.Record(innserSchema)
  ...
})

Поскольку вам не нужно анализировать схему avro для каждой записи, которую вы обрабатываете, лучшим решением будет анализ схемы на уровне раздела. Также работает следующее:

val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it for other purposes
...
rdd.mapPartitions(tuples => {
  // create a new Schema object
  val innserSchema = new Schema.Parser().parse(new File(jsonSchema))

  tuples.map(t => {
    val record = new GenericData.Record(innserSchema)
    ...
    // this closure will be bundled together with the outer one 
    // (no serialization issues)
  })
})

Приведенный выше код работает до тех пор, пока вы предоставляете переносную ссылку на файл jsonSchema, поскольку функция карты будет выполняться несколькими удаленными исполнителями. Это может быть ссылка на файл в HDFS или его можно упаковать вместе с приложением в JAR (вы будете использовать функции класса-загрузчика, чтобы получить его содержимое в последнем случае).

Для тех, кто пытается использовать Avro с Spark, обратите внимание, что все еще есть нерешенные проблемы компиляции, и вы должны использовать следующий импорт на Maven POM:

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-mapred</artifactId>
  <version>1.7.7</version>
  <classifier>hadoop2</classifier>
<dependency>

Обратите внимание на классификатор "hadoop2". Вы можете отслеживать проблему на https://issues.apache.org/jira/browse/SPARK-3039.

Ответ 2

Сериализатором по умолчанию, используемым Spark, является сериализация Java. Поэтому для всех типов java он попытается выполнить сериализацию с использованием сериализации Java. AvroKey не является сериализуемым, поэтому вы получаете ошибки.

Вы можете использовать KryoSerializer или плагин в своей пользовательской сериализации (например, Avro). Здесь вы можете узнать больше о сериализации. http://spark-project.org/docs/latest/tuning.html

Вы также можете обернуть свой объект чем-то внешним. Проверьте, например, SparkFlumeEvent, который включает AvroFlumeEvent здесь: https://github.com/apache/spark/blob/master/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala