Spark Streaming mapWithState, похоже, периодически восстанавливает полное состояние

Я работаю над потоковым проектом Scala (2.11)/Spark (1.6.1) и используя mapWithState() для отслеживания увиденных данных из предыдущих партий.

Состояние распределено в 20 разделах на нескольких узлах, созданных с помощью StateSpec.function(trackStateFunc _).numPartitions(20). В этом состоянии у нас есть только несколько ключей (~ 100), сопоставленных с Sets с более чем 160 000 записей, которые растут во всем приложении. Все состояние до 3GB, которое может обрабатываться каждым node в кластере. В каждой партии некоторые данные добавляются в состояние, но не удаляются до самого конца процесса, т.е. ~ 15 минут.

При выполнении пользовательского интерфейса приложения каждое 10-е время обработки партии очень велико по сравнению с другими партиями. Смотрите изображения:

Шипы показывают более высокое время обработки.

Желтые поля представляют собой большое время обработки.

введите описание изображения здесь

Более подробное представление Job показывает, что в этих партиях происходит в определенную точку, точно, когда все 20 разделов "пропущены". Или это то, что говорит пользовательский интерфейс.

введите описание изображения здесь

Мое понимание skipped заключается в том, что каждый государственный раздел является одной возможной задачей, которая не выполняется, поскольку ее не нужно переучитывать. Однако я не понимаю, почему количество skips меняется в каждом задании и почему последнее задание требует такой обработки. Чем больше время обработки происходит независимо от размера штата, оно просто влияет на продолжительность.

Является ли это ошибкой в ​​функциональности mapWithState() или это предполагаемое поведение? Требует ли какая-либо перетасовка базовая структура данных, требуется ли Set в состоянии копировать данные? Или это скорее будет недостатком в моем приложении?

Ответы

Ответ 1

Является ли это ошибкой в ​​функции mapWithState() или это поведение?

Это предполагаемое поведение. Шипы, которые вы видите, - это то, что ваши данные проходят контрольную проверку в конце данной партии. Если вы заметите время на более длинных партиях, вы увидите, что это происходит постоянно каждые 100 секунд. Это связано с тем, что время контрольной точки является постоянным и рассчитывается за ваш batchDuration, а именно, как часто вы разговариваете с вашим источником данных для чтения партии, умноженной на некоторую константу, если только вы явно не устанавливаете интервал DStream.checkpoint.

Вот соответствующий фрагмент кода из MapWithStateDStream:

override def initialize(time: Time): Unit = {
  if (checkpointDuration == null) {
    checkpointDuration = slideDuration * DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
  }
  super.initialize(time)
}

Где DEFAULT_CHECKPOINT_DURATION_MULTIPLIER:

private[streaming] object InternalMapWithStateDStream {
  private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10
}

Какая линия точно совпадает с поведением, которое вы видите, поскольку ваша продолжительность сеанса чтения составляет каждые 10 секунд = > 10 * 10 = 100 секунд.

Это нормально, и это стоимость сохраняющегося состояния с Spark. Оптимизация на вашей стороне может заключаться в том, чтобы подумать о том, как вы можете минимизировать размер состояния, которое вы должны хранить в памяти, чтобы эта сериализация была как можно быстрее. Кроме того, убедитесь, что данные распределены по всему количеству исполнителей, так что состояние распределяется равномерно между всеми узлами. Кроме того, я надеюсь, что вы включили Kryo Serialization вместо стандартной сериализации Java, которая может дать вам существенное повышение производительности.

Ответ 2

В дополнение к принятому ответу, указав цену сериализации, связанную с контрольной точкой, есть еще одна, менее известная проблема, которая может повлиять на поведение spikey: выселение удаленных состояний.

В частности, состояния "удаленные" или "таймированные" не удаляются сразу с карты, а помечены для удаления и фактически удаляются только в процессе сериализации [в Spark 1.6.1, см. writeObjectInternal()].

Это имеет два значения производительности, которые происходят только один раз за 10 партий:

  • Процесс обхода и удаления имеет свою цену
  • Если вы обрабатываете поток событий с тайм-аутом/удалением, например. сохраняйте его на внешнем хранилище, соответствующая стоимость для всех 10 партий будет выплачена только в этот момент (и не так, как можно было ожидать, на каждом RDD).