Интеграция Spark структурированной потоковой передачи с реестром схемы Confluent

Я использую источник Кафки в Spark Structured Streaming для получения записей Avro в кодировке Confluent. Я намерен использовать реестр Confluent Schema, но интеграция с потоковой структурой с искрой кажется невозможной.

Я видел этот вопрос, но не смог заставить его работать с реестром схемы Confluent. Чтение сообщений Avro от Kafka с помощью Spark 2.0.2 (структурированная потоковая передача)

Ответы

Ответ 1

Поскольку другой ответ, который был в основном полезен, был удален, я хотел добавить его повторно с некоторыми реорганизациями и комментариями.

Вот необходимые зависимости.

     <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
            <exclusions> 
                <!-- Conflicts with Spark version -->
                <exclusion> 
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
     </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-avro_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

А вот реализация Scala (тестируется только локально на master=local[*])

В первом разделе определите импорт, некоторые поля и несколько вспомогательных методов для получения схем

import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.cli.CommandLine
import org.apache.spark.sql._
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.streaming.OutputMode

object App {

  private var schemaRegistryClient: SchemaRegistryClient = _

  private var kafkaAvroDeserializer: AvroDeserializer = _

  def lookupTopicSchema(topic: String, isKey: Boolean = false) = {
    schemaRegistryClient.getLatestSchemaMetadata(topic + (if (isKey) "-key" else "-value")).getSchema
  }

  def avroSchemaToSparkSchema(avroSchema: String) = {
    SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
  }

Затем определите простой метод main, который анализирует аргументы CMD, чтобы получить подробности Kafka

  def main(args: Array[String]): Unit = {
    val cmd: CommandLine = parseArg(args)

    val master = cmd.getOptionValue("master", "local[*]")
    val spark = SparkSession.builder()
      .appName(App.getClass.getName)
      .master(master)
      .getOrCreate()

    val bootstrapServers = cmd.getOptionValue("bootstrap-server")
    val topic = cmd.getOptionValue("topic")
    val schemaRegistryUrl = cmd.getOptionValue("schema-registry")

    consumeAvro(spark, bootstrapServers, topic, schemaRegistryUrl)

    spark.stop()
  }

Затем важный метод, который использует тему Кафки и десериализует ее

  private def consumeAvro(spark: SparkSession, bootstrapServers: String, topic: String, schemaRegistryUrl: String): Unit = {
    import spark.implicits._

    // Setup the Avro deserialization UDF
    schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
    kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
    spark.udf.register("deserialize", (bytes: Array[Byte]) =>
      kafkaAvroDeserializer.deserialize(bytes)
    )

    // Load the raw Kafka topic (byte stream)
    val rawDf = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
      .option("startingOffsets", "earliest")
      .load()

    // Deserialize byte stream into strings (Avro fields become JSON)
    import org.apache.spark.sql.functions._
    val jsonDf = rawDf.select(
      // 'key.cast(DataTypes.StringType),  // string keys are simplest to use
      callUDF("deserialize", 'key).as("key"), // but sometimes they are avro
      callUDF("deserialize", 'value).as("value")
      // excluding topic, partition, offset, timestamp, etc
    )

    // Get the Avro schema for the topic from the Schema Registry and convert it into a Spark schema type
    val dfValueSchema = {
      val rawSchema = lookupTopicSchema(topic)
      avroSchemaToSparkSchema(rawSchema)
    }

    // Apply structured schema to JSON stream
    val parsedDf = jsonDf.select(
      'key, // keys are usually plain strings
      // values are JSONified Avro records
      from_json('value, dfValueSchema.dataType).alias("value")
    ).select(
      'key,
      $"value.*" // flatten out the value
    )

    // parsedDf.printSchema()

    // Sample schema output
    // root
    // |-- key: string (nullable = true)
    // |-- header: struct (nullable = true)
    // |    |-- time: long (nullable = true)
    // |    ...

    // TODO: Do something interesting with this stream
    parsedDf.writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .option("truncate", false)
      .start()
      .awaitTermination()
  }

Анализатор командной строки позволяет передавать серверы начальной загрузки, реестр схем, имя темы и мастер Spark.

  private def parseArg(args: Array[String]): CommandLine = {
    import org.apache.commons.cli._

    val options = new Options

    val masterOption = new Option("m", "master", true, "Spark master")
    masterOption.setRequired(false)
    options.addOption(masterOption)

    val bootstrapOption = new Option("b", "bootstrap-server", true, "Bootstrap servers")
    bootstrapOption.setRequired(true)
    options.addOption(bootstrapOption)

    val topicOption = new Option("t", "topic", true, "Kafka topic")
    topicOption.setRequired(true)
    options.addOption(topicOption)

    val schemaRegOption = new Option("s", "schema-registry", true, "Schema Registry URL")
    schemaRegOption.setRequired(true)
    options.addOption(schemaRegOption)

    val parser = new BasicParser
    parser.parse(options, args)
  }

Чтобы описанная выше функция UDF работала, необходимо создать десериализатор, который переносит байты данных в кадр, содержащий десериализованное Avro.

  // Simple wrapper around Confluent deserializer
  class AvroDeserializer extends AbstractKafkaAvroDeserializer {
    def this(client: SchemaRegistryClient) {
      this()
      this.schemaRegistry = client
    }

    override def deserialize(bytes: Array[Byte]): String = {
      val value = super.deserialize(bytes)
      value match {
        case str: String =>
          str
        case _ =>
          val genericRecord = value.asInstanceOf[GenericRecord]
          genericRecord.toString
      }
    }
  }

}

Соедините каждый из этих блоков, и он будет работать в IntelliJ после добавления -b localhost:9092 -s http://localhost:8081 -t myTopic в меню "Запустить конфигурации"> "Аргументы программы".

Ответ 2

У меня ушло пару месяцев на чтение исходного кода и тестирование. В двух словах, Spark может обрабатывать только сериализацию String и Binary. Вы должны вручную десериализовать данные. В spark создайте объект службы слияния с остальными, чтобы получить схему. Преобразуйте строку схемы в объекте ответа в схему Avro с помощью анализатора Avro. Далее прочитайте тему Кафки как обычно. Затем сопоставьте столбец "значение" двоичного типа с помощью Confluent KafkaAvroDeSerializer. Я настоятельно рекомендую войти в исходный код для этих классов, потому что здесь много чего происходит, поэтому для краткости опущу много деталей.

//Used Confluent version 3.2.2 to write this. 
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema

case class DeserializedFromKafkaRecord(key: String, value: String)

val schemaRegistryURL = "http://127.0.0.1:8081"

val topicName = "Schema-Registry-Example-topic1"
val subjectValueName = topicName + "-value"

//create RestService object
val restService = new RestService(schemaRegistryURL)

//.getLatestVersion returns io.confluent.kafka.schemaregistry.client.rest.entities.Schema object.
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)

//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)

//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)

//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)

//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null

//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = sql.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "127.0.0.1:9092")
  .option("subscribe", topicName)
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 20)  //remove for prod
  .load()

//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map{
  row =>
    if (keyDeserializer == null) {
      keyDeserializer = new KafkaAvroDeserializer
      keyDeserializer.configure(props.asJava, true)  //isKey = true
    }
    if (valueDeserializer == null) {
      valueDeserializer = new KafkaAvroDeserializer
      valueDeserializer.configure(props.asJava, false) //isKey = false
    }

    //Pass the Avro schema.
    val deserializedKeyString = keyDeserializer.deserialize(topicName, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
    val deserializedValueString = valueDeserializer.deserialize(topicName, row.value, topicValueAvroSchema).toString

    DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueString)
}

val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", false)
    .start()

Ответ 3

Эта библиотека выполнит эту работу за вас. Он подключается к Confluent Kafka и Schema Registry через Spark Structured Stream.

Для Confluent он справляется с идентификатором схемы, который отправляется вместе с полезной нагрузкой.

В README вы найдете фрагмент кода о том, как это сделать.

РАСКРЫТИЕ: Я работаю над ABSA, и я разработал эту библиотеку.

Ответ 4

Для тех, кто хочет сделать это в pyspark: библиотека, на которую ссылался felipe, раньше хорошо работала на JVM для меня, поэтому я написал небольшую функцию-обертку, которая интегрирует ее в python. Это выглядит очень странно, потому что многие типы, неявные в языке scala, должны быть явно указаны в py4j. До сих пор работал хорошо, хотя, даже в свече 2.4.1.

def expand_avro(spark_context, sql_context, data_frame, schema_registry_url, topic):
    j = spark_context._gateway.jvm
    dataframe_deserializer = j.za.co.absa.abris.avro.AvroSerDe.DataframeDeserializer(data_frame._jdf)
    naming_strategy = getattr(
        getattr(j.za.co.absa.abris.avro.read.confluent.SchemaManager,
                "SchemaStorageNamingStrategies$"), "MODULE$").TOPIC_NAME()
    conf = getattr(getattr(j.scala.collection.immutable.Map, "EmptyMap$"), "MODULE$")
    conf = getattr(conf, "$plus")(j.scala.Tuple2("schema.registry.url", schema_registry_url))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("schema.registry.topic", topic))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("value.schema.id", "latest"))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("value.schema.naming.strategy", naming_strategy))
    schema_path = j.scala.Option.apply(None)
    conf = j.scala.Option.apply(conf)
    policy = getattr(j.za.co.absa.abris.avro.schemas.policy.SchemaRetentionPolicies, "RETAIN_SELECTED_COLUMN_ONLY$")()
    data_frame = dataframe_deserializer.fromConfluentAvro("value", schema_path, conf, policy)
    data_frame = DataFrame(data_frame, sql_context)
    return data_frame

Чтобы это работало, вы должны добавить библиотеку в пакеты spark, например

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages ' \
    'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1,' \
    'org.apache.spark:spark-avro_2.11:2.4.1,' \
    'za.co.absa:abris_2.11:2.2.2 ' \
    '--repositories https://packages.confluent.io/maven/ ' \
    'pyspark-shell'

Ответ 5

Блоки данных теперь предоставляют эту функциональность, но вы должны платить за это :-(

dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .save()

См. Https://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html для получения дополнительной информации.

Хорошая бесплатная альтернатива - это ABRIS. См. Https://github.com/AbsaOSS/ABRiS. Единственный недостаток, который мы видим, это то, что вам нужно предоставить файл вашей схемы avro во время выполнения, чтобы инфраструктура могла применить эту схему к вашему фрейму данных, прежде чем опубликовать его в теме Kafka.,

Ответ 6

Проверьте этот проект - https://github.com/hortonworks-spark/spark-schema-registry

Это позволяет интегрировать реестр Hortonwork Schema (https://github.com/hortonworks/registry) со Spark. Также возможно включить это в реестр Confluent Schema (поскольку реестр Hortonworks Schema совместим с Confluent), но вам нужно будет изучить его дальше.