Понимание кэширования искры
Я пытаюсь понять, как работает Spark cache.
Вот мое наивное понимание, пожалуйста, дайте мне знать, если я что-то упустил:
val rdd1 = sc.textFile("some data")
rdd1.cache() //marks rdd1 as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")
В приведенном выше примере rdd1 будет загружаться с диска (например, HDFS) только один раз. (при сохранении rdd2 я предполагаю), а затем из кеша (при условии, что имеется достаточное количество ОЗУ), когда сохраняется rdd3)
Теперь вот мой вопрос. Скажем, я хочу кэшировать rdd2 и rdd3, поскольку они оба будут использоваться позже, но мне не нужно rdd1 после их создания.
В основном существует дублирование, не так ли? Поскольку, как только rdd2 и rdd3 вычисляются, мне больше не нужен rdd1, я, вероятно, должен его отменить, не так ли? вопрос в том, когда?
Будет ли это работать? (Вариант A)
val rdd1 = sc.textFile("some data")
rdd1.cache() // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd1.unpersist()
Является ли искра добавлением негерметичного вызова в DAG? или это делается немедленно? если это будет сделано немедленно, то в основном rdd1 будет не кэшироваться, когда я прочитаю из rdd2 и rdd3, правильно?
Должен ли я сделать это вместо этого (вариант B)?
val rdd1 = sc.textFile("some data")
rdd1.cache() // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")
rdd1.unpersist()
Итак, вопрос в следующем:
Вариант А достаточно хорош? т.е. будет ли rdd1
загружать файл только один раз?
Или мне нужно пойти с вариантом B?
Ответы
Ответ 1
Казалось бы, требуется вариант B. Причина связана с тем, как Spare пытается выполнить сохранение/кеш и непервис. Поскольку преобразования RDD просто создают описания DAG без выполнения, в Варианте A к моменту, когда вы вызываете unpersist, у вас все еще есть только описания заданий, а не выполнение.
Это актуально, потому что вызов cache
или persist
просто добавляет RDD к карте RDD, которые помечали, что они сохраняются во время выполнения задания. Тем не менее, unpersist
непосредственно сообщает blockManager об исключении RDD из хранилища и удаляет ссылку на карте постоянных RDD.
сохранить функцию
функция непервистов
Итак, вам нужно будет вызвать непервист после того, как Spark выполнит и сохранит RDD с менеджером блоков.
Комментарии для метода RDD.persist
указывают на это:
rdd.persist
Ответ 2
В опции A вы не показывались, когда вы вызываете действие (вызов для сохранения)
val rdd1 = sc.textFile("some data")
rdd.cache() //marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd1.unpersist()
rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")
Если последовательность такая, как указано выше, в опции A должна использоваться кешированная версия rdd1 для вычисления как rdd2, так и rdd 3