Инициализировать RDD для пустых
У меня есть RDD, называемый
JavaPairRDD<String, List<String>> existingRDD;
Теперь мне нужно инициализировать этот existingRDD
пустым, так что, когда я получу фактическое rdd, я могу сделать объединение с этим existingRDD
.
Как инициализировать existingRDD
на пустой RDD, кроме инициализации его до нуля?
Вот мой код:
JavaPairRDD<String, List<String>> existingRDD;
if(ai.get()%10==0)
{
existingRDD.saveAsNewAPIHadoopFile("s3://manthan-impala-test/kinesis-dump/" + startTime + "/" + k + "/" + System.currentTimeMillis() + "/",
NullWritable.class, Text.class, TextOutputFormat.class); //on worker failure this will get overwritten
}
else
{
existingRDD.union(rdd);
}
Ответы
Ответ 1
Я все еще не уверен в том, что вы пытаетесь сделать, но вы можете создать пустой RDD, как следует:
// Get an RDD that has no partitions or elements.
JavaRDD<T> emptyRDD = sc.emptyRDD()
Надеюсь, вы знаете, как использовать дженерики, в противном случае это:
JavaRDD<Tuple2<String,List<String>>> emptyRDD = sc.emptyRDD();
JavaPairRDD<String,List<String>> emptyPairRDD = JavaPairRDD.fromJavaRDD(
existingRDD
);
Вы также можете использовать метод mapToPair
для преобразования JavaRDD
в JavaPairRDD
.
Решение в scala:
scala> val emptyRDD = sc.emptyRDD
// emptyRDD: org.apache.spark.rdd.EmptyRDD[Nothing] = EmptyRDD[1] at ...
Ответ 2
В scala я использовал команду "parallelize".
val emptyRDD = sc.parallelize(Seq(""))
Ответ 3
@eliasah ответ очень полезен, я предоставляю код для создания пустой пары RDD. Рассмотрим сценарий, в котором требуется создать пустую пару RDD (ключ, значение). Следующий scala код иллюстрирует, как создать пустую пару RDD с ключом как String и значение как Int.
type pairRDD = (String,Int)
var resultRDD = sparkContext.emptyRDD[pairRDD]
RDD будет создан следующим образом:
resultRDD: org.apache.spark.rdd.EmptyRDD[(String, Int)] = EmptyRDD[0] at emptyRDD at <console>:29
Ответ 4
В Java создание пустого RDD было немного сложным. Я попытался использовать scala.reflect.classTag, но он не работает. После многих тестов код, который работал, был еще более простым.
private JavaRDD<Foo> getEmptyJavaRdd() {
/* this code does not compile because require <T> as parameter into emptyRDD */
// JavaRDD<Foo> emptyRDD = sparkContext.emptyRDD();
// return emptyRDD;
/* this should be the solution that try to emulate the scala <T> */
/* but i could not make it work too */
// ClassTag<Foo> tag = scala.reflect.ClassTag$.MODULE$.apply(Foo.class);
// return sparkContext.emptyRDD(tag);
/* this alternative worked into java 8 */
return SparkContext.parallelize(
java.util.Arrays.asList()
);
}