Инициализировать RDD для пустых

У меня есть RDD, называемый

JavaPairRDD<String, List<String>> existingRDD; 

Теперь мне нужно инициализировать этот existingRDD пустым, так что, когда я получу фактическое rdd, я могу сделать объединение с этим existingRDD. Как инициализировать existingRDD на пустой RDD, кроме инициализации его до нуля? Вот мой код:

JavaPairRDD<String, List<String>> existingRDD;
if(ai.get()%10==0)
{
    existingRDD.saveAsNewAPIHadoopFile("s3://manthan-impala-test/kinesis-dump/" + startTime + "/" + k + "/" + System.currentTimeMillis() + "/",
    NullWritable.class, Text.class, TextOutputFormat.class); //on worker failure this will get overwritten                                  
}
else
{
    existingRDD.union(rdd);
}

Ответы

Ответ 1

Я все еще не уверен в том, что вы пытаетесь сделать, но вы можете создать пустой RDD, как следует:

// Get an RDD that has no partitions or elements.
JavaRDD<T> emptyRDD = sc.emptyRDD() 

Надеюсь, вы знаете, как использовать дженерики, в противном случае это:

JavaRDD<Tuple2<String,List<String>>> emptyRDD = sc.emptyRDD();
JavaPairRDD<String,List<String>> emptyPairRDD = JavaPairRDD.fromJavaRDD(
  existingRDD
);

Вы также можете использовать метод mapToPair для преобразования JavaRDD в JavaPairRDD.

Решение в scala:

scala> val emptyRDD = sc.emptyRDD
// emptyRDD: org.apache.spark.rdd.EmptyRDD[Nothing] = EmptyRDD[1] at ...

Ответ 2

В scala я использовал команду "parallelize".

val emptyRDD = sc.parallelize(Seq(""))

Ответ 3

@eliasah ответ очень полезен, я предоставляю код для создания пустой пары RDD. Рассмотрим сценарий, в котором требуется создать пустую пару RDD (ключ, значение). Следующий scala код иллюстрирует, как создать пустую пару RDD с ключом как String и значение как Int.

type pairRDD = (String,Int)
var resultRDD = sparkContext.emptyRDD[pairRDD]

RDD будет создан следующим образом:

resultRDD: org.apache.spark.rdd.EmptyRDD[(String, Int)] = EmptyRDD[0] at emptyRDD at <console>:29

Ответ 4

В Java создание пустого RDD было немного сложным. Я попытался использовать scala.reflect.classTag, но он не работает. После многих тестов код, который работал, был еще более простым.

private JavaRDD<Foo> getEmptyJavaRdd() {

/* this code does not compile because require <T> as parameter into emptyRDD */
//        JavaRDD<Foo> emptyRDD = sparkContext.emptyRDD();
//        return emptyRDD;

/* this should be the solution that try to emulate the scala <T> */
/* but i could not make it work too */
//        ClassTag<Foo> tag = scala.reflect.ClassTag$.MODULE$.apply(Foo.class);
//        return sparkContext.emptyRDD(tag);

/* this alternative worked into java 8 */
    return SparkContext.parallelize(
            java.util.Arrays.asList()
    );

}