Почему Spark хуже работает при использовании сериализации Kryo?

Я включил сериализацию Kryo для моего задания Spark, включил параметр, требующий регистрации, и обеспечил регистрацию всех моих типов.

val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
conf.registerKryoClasses(classes)
conf.registerAvroSchemas(avroSchemas: _*)

Производительность Wallclock-времени работы ухудшилась примерно на 20%, а количество перетасованных байтов увеличилось почти на 400%.

Мне кажется, что это действительно удивительно, учитывая документацию Spark о том, что Kryo должен быть лучше.

Kryo значительно быстрее и компактнее, чем Java-сериализация (часто до 10 раз)

Я вручную вывел метод serialize для экземпляров Spark org.apache.spark.serializer.KryoSerializer и org.apache.spark.serializer.JavaSerializer с примером моих данных. Результаты были согласуются с предложениями в документации Spark: Kryo выпустил 98 байт; Java произвела 993 байта. Это действительно 10-кратное улучшение.

Возможно, смешающим фактором является то, что объекты, которые сериализуются и перетасовываются, реализуют интерфейс Avro GenericRecord. Я попытался зарегистрировать схемы Avro в SparkConf, но это не улучшилось.

Я попробовал сделать новые классы для перетасовки данных, которые были простыми Scala case class es, не включая какие-либо машины Avro. Это не улучшило производительность перемешивания или количество обмениваемых байтов.

Код искры заканчивается следующим образом:

case class A(
    f1: Long,
    f2: Option[Long],
    f3: Int,
    f4: Int,
    f5: Option[String],
    f6: Option[Int],
    f7: Option[String],
    f8: Option[Int],
    f9: Option[Int],
    f10: Option[Int],
    f11: Option[Int],
    f12: String,
    f13: Option[Double],
    f14: Option[Int],
    f15: Option[Double],
    f16: Option[Double],
    f17: List[String],
    f18: String) extends org.apache.avro.specific.SpecificRecordBase {
  def get(f: Int) : AnyRef = ???
  def put(f: Int, value: Any) : Unit = ???
  def getSchema(): org.apache.avro.Schema = A.SCHEMA$
}
object A extends AnyRef with Serializable {
  val SCHEMA$: org.apache.avro.Schema = ???
}

case class B(
    f1: Long
    f2: Long
    f3: String
    f4: String) extends org.apache.avro.specific.SpecificRecordBase {
  def get(field$ : Int) : AnyRef = ???
  def getSchema() : org.apache.avro.Schema = B.SCHEMA$
  def put(field$ : Int, value : Any) : Unit = ???
}
object B extends AnyRef with Serializable {
  val SCHEMA$ : org.apache.avro.Schema = ???
}

def join(as: RDD[A], bs: RDD[B]): (Iterable[A], Iterable[B]) = {
  val joined = as.map(a => a.f1 -> a) cogroup bs.map(b => b.f1 -> b)
  joined.map { case (_, asAndBs) => asAndBs }
}

Есть ли у вас какая-либо идея, что может произойти или как я могу получить лучшую производительность, которая должна быть доступна от Kryo?

Ответы

Ответ 1

Если ваш размер одной записи слишком мал и количество записей в огромном количестве может замедлить работу. Постарайтесь увеличить размер буфера и посмотреть, не улучшилось ли оно.

Попробуйте выполнить приведенный ниже, если это уже не сделано.

val conf = new SparkConf()
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  // Now it 24 Mb of buffer by default instead of 0.064 Mb
  .set("spark.kryoserializer.buffer.mb","24") 

Ссылка: https://ogirardot.wordpress.com/2015/01/09/changing-sparks-default-java-serialization-to-kryo/

Ответ 2

Поскольку у вас большие RDD с кардинальной передачей, соединение хеш-трансляции/вещания, похоже, будет, к сожалению, недоступным.

Лучше всего объединить() ваши RDD перед присоединением. Вы видите высокий перекос во время перетасовки? Если это так, вы можете захотеть объединиться с shuffle = true.

Наконец, если у вас есть RDD вложенных структур (например, JSON), которые иногда позволят вам обходить тасования. Подробнее см. Слайды и/или видео здесь.