Искровое потоковое восстановление контрольной точки очень медленное
- Цель: прочитать с Kinesis и сохранить данные в S3 в формате Паркета с помощью искрового потока.
- Ситуация:
Сначала приложение выполняется нормально, количество партий составляет 1 час, а время обработки составляет менее 30 минут. По какой-то причине скажем, что приложение выходит из строя, и мы пытаемся перезапустить его с контрольной точки. Обработка теперь берет навсегда и не движется вперед.
Мы попытались проверить одно и то же с интервалом в 1 минуту, обработка проходит нормально и занимает 1-2 минуты для завершения партии. Когда мы возвращаемся с контрольной точки, для каждой партии требуется около 15 минут.
- Примечания:
мы используем s3 для контрольных точек
используя 1 исполнитель, с 19g mem и 3 ядрами на исполнителя
Присоединение скриншотов:
Первый запуск - до восстановления контрольной точки
![Перед контрольной точкой - потоковая страница]()
![Перед контрольной точкой - Страница вакансий]()
![Перед контрольной точкой - Jobs page2]()
Попытка восстановления с контрольной точки:
![После контрольной точки - Страница вакансий]()
Config.scala
object Config {
val sparkConf = new SparkConf
val sc = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)
val eventsS3Path = sc.hadoopConfiguration.get("eventsS3Path")
val useIAMInstanceRole = sc.hadoopConfiguration.getBoolean("useIAMInstanceRole",true)
val checkpointDirectory = sc.hadoopConfiguration.get("checkpointDirectory")
// sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
DateTimeZone.setDefault(DateTimeZone.forID("America/Los_Angeles"))
val numStreams = 2
def getSparkContext(): SparkContext = {
this.sc
}
def getSqlContext(): HiveContext = {
this.sqlContext
}
}
S3Basin.scala
object S3Basin {
def main(args: Array[String]): Unit = {
Kinesis.startStreaming(s3basinFunction _)
}
def s3basinFunction(streams : DStream[Array[Byte]]): Unit ={
streams.foreachRDD(jsonRDDRaw =>{
println(s"Old partitions ${jsonRDDRaw.partitions.length}")
val jsonRDD = jsonRDDRaw.coalesce(10,true)
println(s"New partitions ${jsonRDD.partitions.length}")
if(!jsonRDD.isEmpty()){
val sqlContext = SQLContext.getOrCreate(jsonRDD.context)
sqlContext.read.json(jsonRDD.map(f=>{
val str = new String(f)
if(str.startsWith("{\"message\"")){
str.substring(11,str.indexOf("@version")-2)
}
else{
str
}
})).registerTempTable("events")
sqlContext.sql(
"""
|select
|to_date(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_date,
|hour(from_utc_timestamp(from_unixtime(at), 'US/Pacific')) as event_hour,
|*
|from events
""".stripMargin).coalesce(1).write.mode(SaveMode.Append).partitionBy("event_date", "event_hour","verb").parquet(Config.eventsS3Path)
sqlContext.dropTempTable("events")
}
})
}
}
Kinesis.scala
object Kinesis{
def functionToCreateContext(streamFunc: (DStream[Array[Byte]]) => Unit): StreamingContext = {
val streamingContext = new StreamingContext(Config.sc, Minutes(Config.sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1))) // new context
streamingContext.checkpoint(Config.checkpointDirectory) // set checkpoint directory
val sc = Config.getSparkContext
var awsCredentails : BasicAWSCredentials = null
val kinesisClient = if(Config.useIAMInstanceRole){
new AmazonKinesisClient()
}
else{
awsCredentails = new BasicAWSCredentials(sc.hadoopConfiguration.get("kinesis.awsAccessKeyId"),sc.hadoopConfiguration.get("kinesis.awsSecretAccessKey"))
new AmazonKinesisClient(awsCredentails)
}
val endpointUrl = sc.hadoopConfiguration.get("kinesis.endpointUrl")
val appName = sc.hadoopConfiguration.get("kinesis.appName")
val streamName = sc.hadoopConfiguration.get("kinesis.streamName")
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
val batchInterval = Minutes(sc.hadoopConfiguration.getInt("kinesis.StreamingBatchDuration",1))
// Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information
// on sequence number of records that have been received. Same as batchInterval for this
// example.
val kinesisCheckpointInterval = batchInterval
// Get the region name from the endpoint URL to save Kinesis Client Library metadata in
// DynamoDB of the same region as the Kinesis stream
val regionName = sc.hadoopConfiguration.get("kinesis.regionName")
val kinesisStreams = (0 until Config.numStreams).map { i =>
println(s"creating stream for $i")
if(Config.useIAMInstanceRole){
KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
}else{
KinesisUtils.createStream(streamingContext, appName, streamName, endpointUrl, regionName,
InitialPositionInStream.TRIM_HORIZON, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2,awsCredentails.getAWSAccessKeyId,awsCredentails.getAWSSecretKey)
}
}
val unionStreams = streamingContext.union(kinesisStreams)
streamFunc(unionStreams)
streamingContext
}
def startStreaming(streamFunc: (DStream[Array[Byte]]) => Unit) = {
val sc = Config.getSparkContext
if(sc.defaultParallelism < Config.numStreams+1){
throw new Exception(s"Number of shards = ${Config.numStreams} , number of processor = ${sc.defaultParallelism}")
}
val streamingContext = StreamingContext.getOrCreate(Config.checkpointDirectory, () => functionToCreateContext(streamFunc))
// sys.ShutdownHookThread {
// println("Gracefully stopping Spark Streaming Application")
// streamingContext.stop(true, true)
// println("Application stopped greacefully")
// }
//
streamingContext.start()
streamingContext.awaitTermination()
}
}
DAG
![DAG]()
![введите описание изображения здесь]()
Ответы
Ответ 1
поднял вопрос о Jira: https://issues.apache.org/jira/browse/SPARK-19304
Проблема заключается в том, что мы читаем больше данных за итерацию, чем требуется, а затем отбрасываем данные. Этого можно избежать, добавив ограничение на вызов getResults
aws.
Исправлено: https://github.com/apache/spark/pull/16842
Ответ 2
Когда неудавшийся драйвер перезагружается, происходит следующее:
- Восстановление вычислений. Контрольная информация используется для
перезапустите драйвер, восстановите контексты и перезапустите все
приемников.
- Восстановить метаданные блока. Метаданные всех блоков, которые будут
необходимое для продолжения обработки, будет восстановлено.
- Сгенерировать неполные задания - для партий с обработкой, которая
не завершился из-за сбоя, RDD и соответствующие
задания восстанавливаются с использованием восстановленных метаданных блока.
- Прочтите блок, сохраненный в журналах. Когда эти задания выполняются,
данные блоков считываются непосредственно из журналов записи вперед. Это восстанавливает
все необходимые данные, которые были надежно сохранены в журналах.
- Повторно отправить неподтвержденные данные. Буферизованные данные, которые не были сохранены в
журнал в момент сбоя будет отправлен снова источником. в виде
он не был подтвержден получателем.
Поскольку все эти шаги выполняются водителю, ваша партия из 0 событий занимает так много времени. Это должно произойти с первой партией, тогда все будет нормально.
Ссылка здесь.
Ответ 3
У меня были подобные проблемы раньше, мое приложение все медленнее и медленнее.
попытайтесь освободить память после использования rdd, вызовите rdd.unpersist()
https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#unpersist(boolean)
или spark.streaming.backpressure.enabled
до true
http://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-interval
http://spark.apache.org/docs/latest/streaming-programming-guide.html#requirements
также проверьте настройки locality
, возможно, слишком много данных перемещается.