Spark - Задача не сериализуема: как работать со сложными закрытиями карт, вызывающими внешние классы/объекты?
Взгляните на этот вопрос: Scala + Spark - Задача не сериализуема: java.io.NotSerializableExceptionon. При вызове функции снаружи закрывается только для классов, а не объектов.
Проблема:
Предположим, что мои mappers могут быть функциями (def), которые внутренне вызывают другие классы и создают объекты и делают разные вещи внутри. (Или они могут даже быть классами, которые расширяют (Foo) = > Bar, и выполняют обработку в методе их применения, но пусть теперь игнорируют этот случай)
Spark поддерживает только Java Serialization для закрытий. Есть ли выход из этого? Можем ли мы использовать что-то вместо закрытия, чтобы делать то, что я хочу сделать? Мы можем легко делать такие вещи с Hadoop. Это единственное, что делает Искра почти непригодной для меня. Нельзя ожидать, что все сторонние библиотеки будут иметь все классы, расширяющие Serializable!
Возможные решения:
Кажется, что-то похожее на это: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
Конечно, похоже, что оболочка - это ответ, но я не вижу точно, как это сделать.
Ответы
Ответ 1
Я понял, как это сделать сам!
Вам просто нужно сериализовать объекты перед прохождением через закрытие и затем де-сериализовать. Этот подход просто работает, даже если ваши классы не являются Serializable, потому что он использует Kryo за кулисами. Все, что вам нужно, это карри.;)
Вот пример того, как я это сделал:
def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
(foo: Foo) : Bar = {
kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()
object Blah(abc: ABC) extends (Foo => Bar) {
def apply(foo: Foo) : Bar = { //This is the real function }
}
Не стесняйтесь сделать Blah настолько сложным, насколько вы хотите, класс, сопутствующий объект, вложенные классы, ссылки на несколько сторонних библиотек.
KryoSerializationWrapper ссылается на: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
Ответ 2
В случае использования Java API вы должны избегать анонимного класса при переходе к закрытию функции сопоставления. Вместо того, чтобы делать карту (новая функция), вам нужен класс, который расширяет вашу функцию и передает ее на карту (..)
Видеть:
https://yanago.wordpress.com/2015/03/21/apache-spark/