Сортировка JavaPairRDD сначала по значению, а затем по ключу
Я пытаюсь сортировать RDD по значению, и если несколько значений равны, то мне нужно эти значения лексикографически.
код:
JavaPairRDD <String,Long> rddToSort = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long > () {
@Override
public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception {
return new Tuple2 < String, Long > (t._1, t._2.count);
}
});
То, что я сделал до сих пор, это использование takeOrdered
и предоставление CustomComperator
, но поскольку takeOrdered
не может обрабатывать большой объем данных, при запуске кода он продолжает выходить (он много ест памяти, которую ОС не может обрабатывать):
List < Tuple2 < String, Long >> rddSorted = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long > () {
@Override
public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception {
return new Tuple2 < String, Long > (t._1, t._2.count);
}
}).takeOrdered(newTopMovies, MapLongValueComparator.VALUE_COMP);
Comperator:
static class MapLongValueComparator implements Comparator < Tuple2 < String, Long >> , Serializable {
private static final long serialVersionUID = 1L;
private static final MapLongValueComparator VALUE_COMP = new MapLongValueComparator();
@Override
public int compare(Tuple2 < String, Long > o1, Tuple2 < String, Long > o2) {
if (o1._2.compareTo(o2._2) == 0) {
return o1._1.compareTo(o2._1);
}
return -o1._2.compareTo(o2._2);
}
}
ERROR:
16/06/30 21:09:23 INFO scheduler.DAGScheduler: Job 18 failed: takeOrdered at MovieAnalyzer.java:708, took 418.149182 s
Как бы вы отсортировали этот RDD? Как вы возьмете значение TopKMovies
для рассмотрения, а в случае равенства ключей - лексикографически.
Спасибо.
Ответы
Ответ 1
Решена проблема с использованием sortByKey с компаратором и разделами после сопоставления <String, Long>
PairRDD с < Tuple2<String,Long> , Long>
PairRDD
JavaPairRDD <Tuple2<String,Long>, Long> sortedRdd = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , Tuple2<String,Long>, Long > () {
@Override
public Tuple2 < Tuple2<String,Long>, Long > call(Tuple2 < String, MovieReview > t) throws Exception {
return new Tuple2 < Tuple2<String,Long>, Long > (new Tuple2<String,Long>(t._1,t._2.count), t._2.count);
}
}).sortByKey(new TupleMapLongComparator(), true, 100);
JavaPairRDD <String,Long> sortedRddToPairs = sortedRdd.mapToPair(new PairFunction<Tuple2<Tuple2<String,Long>,Long>, String, Long>() {
@Override
public Tuple2<String, Long> call(
Tuple2<Tuple2<String, Long>, Long> t) throws Exception {
return new Tuple2 < String, Long > (t._1._1, t._1._2);
}
});
Компаратор:
private class TupleMapLongComparator implements Comparator<Tuple2<String,Long>>, Serializable {
@Override
public int compare(Tuple2<String,Long> tuple1, Tuple2<String,Long> tuple2) {
if (tuple1._2.compareTo(tuple2._2) == 0) {
return tuple1._1.compareTo(tuple2._1);
}
return -tuple1._2.compareTo(tuple2._2);
}
}
Ответ 2
Вы попробовали вторичную сортировку в Spark?
Spark Secondary Sort