Как выполнить одну операцию для каждого исполнителя однажды в искровом режиме
У меня есть модель weka, которая хранится в S3 размером около 400 МБ. Теперь у меня есть набор записей, на которых я хочу запустить модель и выполнить предсказание.
Для выполнения предсказания, что я пробовал,
-
Загрузите и загрузите модель в качестве статического объекта, передайте ее всем исполнителям. Выполните операцию карты по предсказанию RDD. → Не работает, как в Weka для выполнения прогноза, объект модели должен быть изменен, а для трансляции требуется копия только для чтения.
-
Загрузите и загрузите модель в качестве статического объекта и отправьте ее исполнителю в каждой операции с картой. -→ Работа (неэффективна, как и в каждой операции с картой, я прохожу 400 МБ объекта)
-
Загрузите модель на драйвер и загрузите ее на каждом исполнителе и кешируйте ее там. (Не знаю, как это сделать)
Кто-нибудь знает, как я могу загрузить модель для каждого исполнителя один раз и кэшировать ее так, чтобы для других записей я не загружал ее снова.
Благодаря Нехе
Ответы
Ответ 1
У вас есть два варианта:
1. Создайте одноэлементный объект с ленивым val, представляющим данные:
object WekaModel {
lazy val data = {
// initialize data here. This will only happen once per JVM process
}
}
Затем вы можете использовать ленивый val в вашей функции map
. lazy val
гарантирует, что каждый рабочий JVM инициализирует свой собственный экземпляр данных. Нет сериализации или трансляций не будет выполняться для data
.
elementsRDD.map { element =>
// use WekaModel.data here
}
преимущества
- более эффективен, так как он позволяет вам инициализировать данные один раз на экземпляр JVM. Этот подход является хорошим выбором при необходимости инициализировать пул соединений с базой данных, например.
Недостатки
- Меньший контроль за инициализацией. Например, сложнее инициализировать объект, если вам нужны параметры времени выполнения.
- Вы не можете действительно освободить или освободить объект, если вам нужно. Обычно это приемлемо, поскольку ОС освобождает ресурсы при выходе из процесса.
2. Используйте метод mapPartition
(или foreachPartition
) на RDD, а не только map
.
Это позволяет вам инициализировать все, что вам нужно для всего раздела.
elementsRDD.mapPartition { elements =>
val model = new WekaModel()
elements.map { element =>
// use model and element. there is a single instance of model per partition.
}
}
Преимущества:
- Обеспечивает большую гибкость при инициализации и деинициализации объектов.
Недостатки
- Каждый раздел будет создавать и инициализировать новый экземпляр вашего объекта. В зависимости от того, сколько разделов у вас есть на одном экземпляре JVM, это может быть или не быть проблемой.
Ответ 2
Здесь то, что работало для меня даже лучше, чем ленивый инициализатор. Я создал указатель уровня объекта, инициализированный нулем, и пусть каждый исполнитель инициализирует его. В блоке инициализации вы можете иметь код запуска. Обратите внимание, что каждая партия обработки перезагружает локальные переменные, но не объекты уровня объекта.
object Thing1 {
var bigObject : BigObject = null
def main(args: Array[String]) : Unit = {
val sc = <spark/scala magic here>
sc.textFile(infile).map(line => {
if (bigObject == null) {
// this takes a minute but runs just once
bigObject = new BigObject(parameters)
}
bigObject.transform(line)
})
}
}
Этот подход создает ровно один большой объект для каждого исполнителя, а не один большой объект на раздел других подходов.
Если вы поместите var bigObject: BigObject = null в пространство имен основных функций, это ведет себя по-разному. В этом случае он запускает конструктор bigObject в начале каждого раздела (т.е. Пакет). Если у вас есть утечка памяти, это в конечном итоге уничтожит исполнителя. Сбор мусора также должен будет сделать больше работы.