Задача не сериализуема: java.io.NotSerializableException при вызове функции закрытие только для классов не объектов
Получение странного поведения при вызове функции за пределами замыкания:
- когда функция находится в объекте, все работает
- когда функция находится в классе get:
Задача не сериализуема: java.io.NotSerializableException: тестирование
Проблема в том, что мне нужен мой код в классе, а не в объекте. Любая идея, почему это происходит? Является ли объект Scala сериализованным (по умолчанию?)?
Это пример рабочего кода:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
Это нерабочий пример:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
Ответы
Ответ 1
Я не думаю, что другой ответ полностью правильный. RDD действительно сериализуемы, так что это не то, что приводит к сбою вашей задачи.
Spark - это распределенный вычислительный движок, а его основная абстракция - это гибкий распределенный набор данных (RDD), который можно рассматривать как распределенную коллекцию. В принципе, элементы RDD разбиваются по узлам кластера, но Spark абстрагирует это от пользователя, позволяя пользователю взаимодействовать с RDD (коллекцией), как если бы он был локальным.
Не вдаваться в слишком много деталей, но когда вы запускаете различные преобразования на RDD (map
, flatMap
, filter
и другие), ваш код преобразования (закрытия):
- сериализован в драйвере node,
- отправляется в соответствующие узлы кластера,
- десериализации,
- и, наконец, выполняется на узлах
Вы можете, конечно, запустить это локально (как в вашем примере), но все эти фазы (помимо доставки по сети) все еще происходят. [Это позволяет вам улавливать любые ошибки даже до развертывания на производстве]
Что происходит во втором случае, так это то, что вы вызываете метод, определенный в классе testing
изнутри функции карты. Spark видит это, и поскольку методы не могут быть сериализованы самостоятельно, Spark пытается сериализовать весь класс testing
, так что код будет работать, когда выполняется в другой JVM. У вас есть две возможности:
Либо вы выполняете сериализацию классов, поэтому весь класс может быть сериализован Spark:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test extends java.io.Serializable {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
def someFunc(a: Int) = a + 1
}
или вы вместо функции someFunc
выполняете функцию (функции - объекты в Scala), поэтому Spark сможет сериализовать ее:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new Test().doIT
}
class Test {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
val someFunc = (a: Int) => a + 1
}
Аналогичная, но не та же проблема с сериализацией классов может вас заинтересовать, и вы можете прочитать на ней в этой презентации Spark Summit 2013.
Как примечание, вы можете переписать rddList.map(someFunc(_))
на rddList.map(someFunc)
, они точно такие же. Обычно второй предпочтительнее, поскольку он менее подробный и более чистый для чтения.
EDIT (2015-03-15): SPARK-5307 представил SerializationDebugger, а Spark 1.3.0 - первая версия для использования, Он добавляет путь сериализации к NotSerializableException. Когда встречается исключение NotSerializableException, отладчик посещает граф объектов, чтобы найти путь к объекту, который не может быть сериализован, и создает информацию, чтобы помочь пользователю найти объект.
В случае OP это то, что печатается в stdout:
Serialization stack:
- object not serializable (class: testing, value: [email protected])
- field (class: testing$$anonfun$1, name: $outer, type: class testing)
- object (class testing$$anonfun$1, <function1>)
Ответ 2
Ответ Grega отлично объясняет, почему исходный код не работает, и два способа исправить проблему. Однако это решение не очень гибко; рассмотрите случай, когда ваше закрытие включает вызов метода в классе Serializable
, который у вас нет. Вы не можете добавить тег Serializable
в этот класс или изменить базовую реализацию, чтобы изменить метод на функцию.
Nilesh представляет большой обходной путь для этого, но решение может быть сделано как более сжатым, так и общим:
def genMapper[A, B](f: A => B): A => B = {
val locker = com.twitter.chill.MeatLocker(f)
x => locker.get.apply(x)
}
Этот функционал-сериализатор затем может использоваться для автоматического переноса замыканий и вызовов методов:
rdd map genMapper(someFunc)
Этот метод также имеет преимущество, не требуя дополнительных зависимостей Shark, чтобы получить доступ к KryoSerializationWrapper
, поскольку Twitter Chill уже вытащен ядром Spark
Ответ 3
Полный разговор, полностью объясняющий проблему, которая предлагает отличную парадигму, позволяющую избежать этих проблем сериализации: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- leaks-no-ws.md
Высший проголосовавший ответ в основном предлагает отбросить всю языковую функцию - это больше не использует методы и использует только функции. В самом деле, в методах функционального программирования в классах следует избегать, но превращение их в функции не решает проблему дизайна здесь (см. Ссылку выше).
В качестве быстрого исправления в этой конкретной ситуации вы можете просто использовать аннотацию @transient
чтобы сказать ей, чтобы она не пыталась сериализовать оскорбительное значение (здесь Spark.ctx
- это настраиваемый класс, а не Spark, следующий после имен OP):
@transient
val rddList = Spark.ctx.parallelize(list)
Вы также можете реструктурировать код, чтобы rddList проживал где-то в другом месте, но это также противно.
Будущее, вероятно, спор
В будущем Scala будет включать в себя такие вещи, которые называются "спорами", которые должны позволить нам контролировать мелкие зерна, что делает и не совсем затягивается закрытием. Кроме того, это должно привести к ошибкам случайного вытягивания несериализуемых типов (или любых нежелательных значений) в ошибки компиляции, а не в настоящее время, что является ужасным исключением/ошибками в работе.
http://docs.scala-lang.org/sips/pending/spores.html
Совет по сериализации Kryo
При использовании kyro сделайте так, чтобы регистрация была необходима, это будет означать, что вы получите ошибки вместо утечек памяти:
"Наконец, я знаю, что у kryo есть kryo.setRegistrationOptional (правда), но у меня очень трудное время, пытаясь выяснить, как его использовать. Когда этот параметр включен, kryo все еще, кажется, бросает исключения, если я не зарегистрирован классы ".
Стратегия регистрации классов с помощью крио
Конечно, это только дает вам контроль уровня на уровне, а не контроль уровня ценности.
... больше идей.
Ответ 4
Я решил эту проблему, используя другой подход. Вам просто нужно сериализовать объекты перед тем, как пройти через закрытие, и затем де-сериализовать. Этот подход просто работает, даже если ваши классы не являются Serializable, потому что он использует Kryo за кулисами. Все, что вам нужно, это карри.;)
Вот пример того, как я это сделал:
def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
(foo: Foo) : Bar = {
kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()
object Blah(abc: ABC) extends (Foo => Bar) {
def apply(foo: Foo) : Bar = { //This is the real function }
}
Не стесняйтесь сделать Blah настолько сложным, насколько вы хотите, класс, сопутствующий объект, вложенные классы, ссылки на несколько сторонних библиотек.
KryoSerializationWrapper относится к: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala
Ответ 5
Я не совсем уверен, что это относится к Scala, но в Java я решил NotSerializableException
путем рефакторинга моего кода, чтобы закрытие не получило доступ к несериализуемому полю final
.
Ответ 6
Я столкнулся с подобной проблемой, и то, что я понимаю из ответа Греги,
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
ваш метод doIT пытается сериализовать метод someFunc (_), но поскольку метод не является сериализуемым, он пытается сериализовать класс тестирование который снова не сериализуем.
Итак, создайте свой код, вы должны определить someFunc внутри метода doIT. Например:
def doIT = {
def someFunc(a:Int) = a+1
//function definition
}
val after = rddList.map(someFunc(_))
after.collect().map(println(_))
}
И если в картинке присутствует несколько функций, все эти функции должны быть доступны для родительского контекста.