Понимание замыканий и их сериализации

Отказ от ответственности: просто начинайте играть с Spark.

У меня проблемы с пониманием знаменитого исключения "Задача не сериализуемая", но мой вопрос немного отличается от тех, которые я вижу на SO (или, как я думаю).

У меня крошечный пользовательский RDD (TestRDD). Он имеет поле, в котором хранятся объекты, класс которых не реализует Serializable (NonSerializable). Я установил опцию конфигурации spark.serializer для использования Kryo. Однако, когда я пытаюсь count() на моем RDD, я получаю следующее:

Caused by: java.io.NotSerializableException: com.complexible.spark.NonSerializable
Serialization stack:
- object not serializable (class: com.test.spark.NonSerializable, value: [email protected])
- field (class: com.test.spark.TestRDD, name: mNS, type: class com.test.spark.NonSerializable)
- object (class com.test.spark.TestRDD, TestRDD[1] at RDD at TestRDD.java:28)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (TestRDD[1] at RDD at TestRDD.java:28,<function2>))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1009)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:933)

Когда я заглядываю внутрь DAGScheduler.submitMissingTasks, я вижу, что он использует свой сериализатор замыкания на моем RDD, который является сериализатором Java, а не сериализатором Kryo, которого я ожидал бы. Я читал, что Kryo имеет проблемы с сериализации замыканий, и Spark всегда использует сериализатор Java для замыканий, но я не совсем понимаю, как замыкания вступают в игру здесь вообще. Все, что я здесь делаю, это:

SparkConf conf = new SparkConf()
                         .setAppName("ScanTest")
                         .setMaster("local")
                         .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

JavaSparkContext sc = new JavaSparkContext(conf);

TestRDD rdd = new TestRDD(sc.sc());
System.err.println(rdd.count());

То есть, никакие карты или что-либо, что потребует сериализации замыканий. OTOH это работает:

sc.parallelize(Arrays.asList(new NonSerializable(), new NonSerializable())).count()

Серийный анализатор Kryo используется, как и ожидалось, сериализатор замыкания не задействован. Если я не установил свойство serializer в Kryo, я тоже получил бы исключение.

Я ценю любые указатели, объясняющие, откуда происходит замыкание, и как обеспечить, чтобы я мог использовать Kryo для сериализации пользовательских RDD.

UPDATE: здесь TestRDD с его несериализуемым полем mNS:

class TestRDD extends RDD<String> {

    private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class);

    NonSerializable mNS = new NonSerializable();

    public TestRDD(final SparkContext _sc) {
        super(_sc,
              JavaConversions.asScalaBuffer(Collections.<Dependency<?>>emptyList()),
              STRING_TAG);
    }

    @Override
    public Iterator<String> compute(final Partition thePartition, final TaskContext theTaskContext) {
        return JavaConverters.asScalaIteratorConverter(Arrays.asList("test_" + thePartition.index(),
                                                                     "test_" + thePartition.index(),
                                                                     "test_" + thePartition.index()).iterator()).asScala();
    }

    @Override
    public Partition[] getPartitions() {
        return new Partition[] {new TestPartition(0), new TestPartition(1), new TestPartition(2)};
    }

    static class TestPartition implements Partition {

        final int mIndex;

        public TestPartition(final int theIndex) {
            mIndex = theIndex;
        }

        public int index() {
            return mIndex;
        }
    }
}

Ответы

Ответ 1

Когда я смотрю внутрь DAGScheduler.submitMissingTasks, я вижу, что он использует его сериализатор замыкания на моем RDD, который является сериализатором Java, а не сериализатор Kryo, который я ожидаю.

SparkEnv поддерживает два сериализатора, один из которых называется serializer, который используется для сериализации ваших данных, контрольной точки, обмена сообщениями между рабочими и т.д. и доступен под флагом конфигурации spark.serializer. Другой называется closureSerializer под spark.closure.serializer, который используется для проверки того, что ваш объект фактически сериализуем и настраивается для Spark & ​​lt; = 1.6.2 (но фактически ничего не работает JavaSerializer) и жестко запрограммировано из 2.0. 0 и выше до JavaSerializer.

У сериализатора закрытия Kryo есть ошибка, которая делает ее непригодной, вы можете увидеть эту ошибку под SPARK-7708 (это может быть исправлено с помощью Kryo 3.0.0, но Spark в настоящее время фиксируется с определенной версией Chill, которая исправлена ​​на Kryo 2.2.1). Кроме того, для Spark 2.0.x теперь JavaSerializer теперь фиксируется вместо настраиваемого (вы можете увидеть его в этом запросе на растяжение). Это означает, что мы эффективно зацикливаемся на JavaSerializer для сериализации закрытия.

Является ли это странным, что мы используем один сериализатор для отправки задач и прочее для сериализации данных между рабочими и т.д.? определенно, но это то, что у нас есть.

Подводя итог, если вы настраиваете конфигурацию spark.serializer или используете SparkContext.registerKryoClasses, вы будете использовать Kryo для большей части вашей сериализации в Spark. Сказав, что для проверки того, является ли данный класс сериализуемым и сериализованным заданием для рабочих, Spark будет использовать JavaSerializer.