Ответ 1
Есть несколько важных различий, но фундаментальное - это то, что происходит с родословной. Persist
/cache
сохраняет целостность линии, а checkpoint
ломает линию. Рассмотрим следующие примеры:
import org.apache.spark.storage.StorageLevel
val rdd = sc.parallelize(1 to 10).map(x => (x % 3, 1)).reduceByKey(_ + _)
-
cache
/Persist
:val indCache = rdd.mapValues(_ > 4) indCache.persist(StorageLevel.DISK_ONLY) indCache.toDebugString // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated] // | ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated] // +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated] // | ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated] indCache.count // 3 indCache.toDebugString // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated] // | CachedPartitions: 8; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 587.0 B // | ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated] // +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated] // | ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
-
checkpoint
:val indChk = rdd.mapValues(_ > 4) indChk.checkpoint // indChk.toDebugString // (8) MapPartitionsRDD[11] at mapValues at <console>:24 [] // | ShuffledRDD[3] at reduceByKey at <console>:21 [] // +-(8) MapPartitionsRDD[2] at map at <console>:21 [] // | ParallelCollectionRDD[1] at parallelize at <console>:21 [] indChk.count // 3 indChk.toDebugString // (8) MapPartitionsRDD[11] at mapValues at <console>:24 [] // | ReliableCheckpointRDD[12] at count at <console>:27 []
Как вы можете видеть в первом случае, линия сохраняется, даже если данные извлекаются из кеша. Это означает, что данные могут быть пересчитаны с нуля, если некоторые разделы indCache
будут потеряны. Во втором случае линейка полностью потеряна после контрольной точки, а indChk
не несет информацию, необходимую для ее перестройки.
checkpoint
, в отличие от cache
/Persist
вычисляется отдельно от других заданий. Для этого необходимо кэшировать RDD, отмеченный для контрольной точки:
Настоятельно рекомендуется, чтобы этот RDD сохранялся в памяти, иначе сохранение его в файле потребует перерасчета.
Наконец checkpointed
данные сохраняются и не удаляются после уничтожения SparkContext
.
Что касается хранения данных SparkContext.setCheckpointDir
, используемого RDD.checkpoint
, требуется DFS
path, если он работает в нелокальном режиме. В противном случае это может быть и локальная файловая система. localCheckpoint
и Persist
без репликации следует использовать локальную файловую систему.
Примечание
Контрольная точка RDD - это другое понятие, чем чек-точка в Spark Streaming. Первый из них предназначен для решения проблемы линейки, последняя касается надежности потоковой передачи и восстановления отказа.