Java.lang.ClassCastException с использованием лямбда-выражений в искровом задании на удаленном сервере

Я пытаюсь создать веб-api для своих исправных работ apache с использованием рамки sparkjava.com. Мой код:

@Override
public void init() {
    get("/hello",
            (req, res) -> {
                String sourcePath = "hdfs://spark:54310/input/*";

                SparkConf conf = new SparkConf().setAppName("LineCount");
                conf.setJars(new String[] { "/home/sam/resin-4.0.42/webapps/test.war" });
                File configFile = new File("config.properties");

                String sparkURI = "spark://hamrah:7077";

                conf.setMaster(sparkURI);
                conf.set("spark.driver.allowMultipleContexts", "true");
                JavaSparkContext sc = new JavaSparkContext(conf);

                @SuppressWarnings("resource")
                JavaRDD<String> log = sc.textFile(sourcePath);

                JavaRDD<String> lines = log.filter(x -> {
                    return true;
                });

                return lines.count();
            });
}

Если я удаляю лямбда-выражение или помещаю его в простую банку, а не в веб-службу (как-то сервлет), она будет работать без какой-либо ошибки. Но использование выражения lambda внутри сервлета приведет к этому исключению:

15/01/28 10:36:33 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, hamrah): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1999)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

P.S: Я попробовал сочетание трикотажа и javaspark с пристанью, комом и смолой, и все они привели меня к такому же результату.

Ответы

Ответ 1

Что вы здесь, это последующая ошибка, которая маскирует исходную ошибку.

Когда экземпляры лямбда сериализованы, они используют writeReplace для растворения специфических JRE реализация из постоянной формы, которая является SerializedLambda пример. Когда экземпляр SerializedLambda был восстановлен, его метод readResolve будет вызван для воссоздания соответствующего экземпляра лямбды. Как говорится в документации, он будет делать это, вызывая специальный метод класса, который определил исходную лямбда (см. Также этот ответ). Важным моментом является то, что исходный класс необходим, и это то, что отсутствует в вашем случае.

Но theres a... special... поведение ObjectInputStream. Когда он сталкивается с исключением, он не может сразу же выручить. Он будет записывать исключение и продолжать процесс, отмечая, что все объекты, которые в настоящее время читаются, в зависимости от ошибочного объекта также ошибочны. Только в конце процесса он будет генерировать исходное исключение, с которым он столкнулся. Что делает его настолько странным, так это то, что он также будет продолжать пытаться установить поля этого объекта. Но когда вы смотрите на метод ObjectInputStream.readOrdinaryObject строка 1806:

…
    if (obj != null &&
        handles.lookupException(passHandle) == null &&
        desc.hasReadResolveMethod())
    {
        Object rep = desc.invokeReadResolve(obj);
        if (unshared && rep.getClass().isArray()) {
            rep = cloneArray(rep);
        }
        if (rep != obj) {
            handles.setObject(passHandle, obj = rep);
        }
    }

    return obj;
}

вы видите, что он не вызывает метод readResolve, когда lookupException сообщает об исключении null. Но когда замены не произошло, не рекомендуется начинать пытаться установить значения полей реферера, но это именно то, что происходит здесь, следовательно, создавая ClassCastException.

Вы можете легко воспроизвести проблему:

public class Holder implements Serializable {
    Runnable r;
}
public class Defining {
    public static Holder get() {
        final Holder holder = new Holder();
        holder.r=(Runnable&Serializable)()->{};
        return holder;
    }
}
public class Writing {
    static final File f=new File(System.getProperty("java.io.tmpdir"), "x.ser");
    public static void main(String... arg) throws IOException {
        try(FileOutputStream os=new FileOutputStream(f);
            ObjectOutputStream   oos=new ObjectOutputStream(os)) {
            oos.writeObject(Defining.get());
        }
        System.out.println("written to "+f);
    }
}
public class Reading {
    static final File f=new File(System.getProperty("java.io.tmpdir"), "x.ser");
    public static void main(String... arg) throws IOException, ClassNotFoundException {
        try(FileInputStream is=new FileInputStream(f);
            ObjectInputStream ois=new ObjectInputStream(is)) {
            Holder h=(Holder)ois.readObject();
            System.out.println(h.r);
            h.r.run();
        }
        System.out.println("read from "+f);
    }
}

Скомпилируйте эти четыре класса и запустите Writing. Затем удалите файл класса Defining.class и запустите Reading. Затем вы получите

Exception in thread "main" java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field test.Holder.r of type java.lang.Runnable in instance of test.Holder
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)

(Протестировано с 1.8.0_20)


Суть в том, что вы можете забыть об этой проблеме Serialization, когда поняли, что происходит, все, что вам нужно сделать для решения вашей проблемы, - это убедиться, что класс, который определил выражение лямбда, также доступен во время выполнения, лямбда десериализуется.

Пример для Spark Job для запуска непосредственно из IDE (по умолчанию исправить-submit распределяет банку):

SparkConf sconf = new SparkConf()
  .set("spark.eventLog.dir", "hdfs://nn:8020/user/spark/applicationHistory")
  .set("spark.eventLog.enabled", "true")
  .setJars(new String[]{"/path/to/jar/with/your/class.jar"})
  .setMaster("spark://spark.standalone.uri:7077");

Ответ 2

Я полагаю, что ваша проблема не удалась автоматически. В коде

x -> {
      return true;
}

вы проходите (String->boolean) lambda (это Predicate<String>), а метод фильтра принимает (String->boolean) лямбда (это Function<String,Boolean>). Поэтому я предлагаю вам изменить код на

x -> {
      return Boolean.TRUE;
}

Включите детали в свой вопрос, пожалуйста. Вывод из uname -a и java -version оценивается. Предоставьте sscce, если это возможно.

Ответ 3

У меня была такая же ошибка, и я заменил лямбду внутренним классом, затем он сработал. Я действительно не понимаю, почему, и воспроизведение этой ошибки было чрезвычайно сложно (у нас был один сервер, который показал поведение, и нигде больше).

Вызывает проблемы с сериализацией (использует lambdas, вызывает ошибку SerializedLambda)

this.variable = () -> { ..... }

Допустимые значения java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field MyObject.val$variable

Работы

this.variable = new MyInterface() {
    public void myMethod() {
       .....
    }
};

Ответ 4

Возможно, вы можете просто скомпилировать свою лямбду Java8 с помощью spark.scala.Function

заменить

output = rdds.map(x->this.function(x)).collect()

с:

output = rdds.map(new Function<Double,Double>(){

   public Double call(Double x){
       return MyClass.this.function(x);
   }

}).collect();

Ответ 5

у меня точно такая же проблема, у меня работает простой код: transactions = data3.map((Function<String, List<String>>) line → Arrays.asList(line.split(" "))); FPGrowth fpg = new FPGrowth().setMinSupport(minSupport).setNumPartitions(10); FPGrowthModel<String> model2 = fpg.run(transactions); transactions = data3.map((Function<String, List<String>>) line → Arrays.asList(line.split(" "))); FPGrowth fpg = new FPGrowth().setMinSupport(minSupport).setNumPartitions(10); FPGrowthModel<String> model2 = fpg.run(transactions);

и я получаю эту ошибку:

Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.fun$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1

моя проблема в том, что я использовал setjars с файлом jar вывода проекта, но это не решает эту проблему, кто-нибудь нашел решение для этого? где находится файл лямбда-фляги, который нужно загрузить?