Ответ 1
Проблема здесь связана с несериализуемостью класса avro.Schema, используемого в Job. Исключение возникает при попытке ссылаться на объект схемы из кода внутри функции карты.
Например, если вы попытаетесь сделать следующее, вы получите исключение "Задача не сериализуемая":
val schema = new Schema.Parser().parse(new File(jsonSchema))
...
rdd.map(t => {
// reference to the schema object declared outside
val record = new GenericData.Record(schema)
})
Вы можете заставить все работать, просто создав новый экземпляр схемы внутри функционального блока:
val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it for other purposes
...
rdd.map(t => {
// create a new Schema object
val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
val record = new GenericData.Record(innserSchema)
...
})
Поскольку вам не нужно анализировать схему avro для каждой записи, которую вы обрабатываете, лучшим решением будет анализ схемы на уровне раздела. Также работает следующее:
val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it for other purposes
...
rdd.mapPartitions(tuples => {
// create a new Schema object
val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
tuples.map(t => {
val record = new GenericData.Record(innserSchema)
...
// this closure will be bundled together with the outer one
// (no serialization issues)
})
})
Приведенный выше код работает до тех пор, пока вы предоставляете переносную ссылку на файл jsonSchema, поскольку функция карты будет выполняться несколькими удаленными исполнителями. Это может быть ссылка на файл в HDFS или его можно упаковать вместе с приложением в JAR (вы будете использовать функции класса-загрузчика, чтобы получить его содержимое в последнем случае).
Для тех, кто пытается использовать Avro с Spark, обратите внимание, что все еще есть нерешенные проблемы компиляции, и вы должны использовать следующий импорт на Maven POM:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.7</version>
<classifier>hadoop2</classifier>
<dependency>
Обратите внимание на классификатор "hadoop2"
. Вы можете отслеживать проблему на https://issues.apache.org/jira/browse/SPARK-3039.