Использовать схему для преобразования сообщений AVRO с помощью Spark в DataFrame
Есть ли способ использовать схему для преобразования avro из kafka с spark до dataframe? Файл схемы для записей пользователя:
{
"fields": [
{ "name": "firstName", "type": "string" },
{ "name": "lastName", "type": "string" }
],
"name": "user",
"type": "record"
}
И фрагменты кода из Пример SqlNetworkWordCount и Kafka, Spark и Avro - Часть 3, Создание и потребление сообщений Avro для чтения в сообщениях.
object Injection {
val parser = new Schema.Parser()
val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
...
messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()
df.show()
})
case class User(firstName: String, lastName: String)
Как-то я не могу найти другой способ, чем использовать класс case для преобразования AVRO-сообщений в DataFrame. Есть ли возможность использовать схему вместо этого? Я использую Spark 1.6.2
и Kafka 0.10
.
Полный код, если вы заинтересованы.
import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}
object ReadMessagesFromKafka {
object Injection {
val parser = new Schema.Parser()
val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
def main(args: Array[String]) {
val brokers = "127.0.0.1:9092"
val topics = "test"
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("ReadMessagesFromKafka").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
ssc, kafkaParams, topicsSet)
messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()
df.show()
})
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
/** Case class for converting RDD to DataFrame */
case class User(firstName: String, lastName: String)
/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
Ответы
Ответ 1
OP, вероятно, решила проблему, но для будущей ссылки я решил эту проблему в целом, поэтому подумал, что это может быть полезно для публикации здесь.
Итак, вы должны преобразовать схему Avro в искровой StructType, а также преобразовать объект, который у вас есть в RDD, в строку [Any], а затем использовать:
spark.createDataFrame(<RDD[obj] mapped to RDD[Row}>,<schema as StructType>
Чтобы преобразовать схему Avro, я использовал spark-avro следующим образом:
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
Преобразование RDD было более сложным. Если ваша схема проста, вы, вероятно, можете просто сделать простую карту.. что-то вроде этого:
rdd.map(obj=>{
val seq = (obj.getName(),obj.getAge()
Row.fromSeq(seq))
})
В этом примере объект имеет 2 имени и возраст полей.
Важно убедиться, что элементы в строке будут соответствовать порядку и типам полей в StructType.
В моем суточном случае у меня был гораздо более сложный объект, который я хотел обработать в целом, чтобы поддерживать будущие изменения схемы, поэтому мой код был намного сложнее.
метод, предложенный OP, должен также работать над некоторыми случаями, но будет сложно предположить сложные объекты (не примитивные или case-class)
еще один совет: если у вас есть класс внутри класса, вы должны преобразовать этот класс в строку, чтобы класс упаковки был преобразован в нечто вроде:
Row(Any,Any,Any,Row,...)
вы также можете посмотреть проект spark-avro, о котором я упоминал ранее, о том, как преобразовать объекты в строки. Я использовал часть логики там
Если кто-то, читающий это, нуждается в дальнейшей помощи, спросите меня в комментариях, и я постараюсь помочь
Ответ 2
Я работал над подобной проблемой, но на Java. Поэтому не уверен в Scala, но посмотрите на библиотеку com.databricks.spark.avro(https://github.com/databricks/spark-avro). Надеюсь, что это поможет.
Ответ 3
Пожалуйста, взгляните на это
https://github.com/databricks/spark-avro/blob/master/src/test/scala/com/databricks/spark/avro/AvroSuite.scala
Итак, вместо
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString,records.get("lastName").toString)).toDF()
вы можете попробовать это
val df = spark.read.avro(message._2.get)
Ответ 4
Для всех, кто интересуется обработкой этого способа, который может обрабатывать изменения схемы без необходимости останавливать и повторно развертывать ваше искровое приложение (если ваша логика приложения может справиться с этим) см. этот вопрос .