Spark: получить верхний N ключом

Скажем, у меня есть PairRDD как таковой (очевидно, что гораздо больше данных в реальной жизни, предполагайте миллионы записей):

val scores = sc.parallelize(Array(
      ("a", 1),  
      ("a", 2), 
      ("a", 3), 
      ("b", 3), 
      ("b", 1), 
      ("a", 4),  
      ("b", 4), 
      ("b", 2)
))

Каков наиболее эффективный способ создания RDD с верхним 2 баллы за ключ?

val top2ByKey = ...
res3: Array[(String, Int)] = Array((a,4), (a,3), (b,4), (b,3))

Ответы

Ответ 1

Я думаю, что это должно быть достаточно эффективным:

Отредактировано в соответствии с комментариями OP:

scores.mapValues(p => (p, p)).reduceByKey((u, v) => {
  val values = List(u._1, u._2, v._1, v._2).sorted(Ordering[Int].reverse).distinct
  if (values.size > 1) (values(0), values(1))
  else (values(0), values(0))
}).collect().foreach(println)

Ответ 3

Немного изменены ваши входные данные.

val scores = sc.parallelize(Array(
      ("a", 1),  
      ("a", 2), 
      ("a", 3), 
      ("b", 3), 
      ("b", 1), 
      ("a", 4),  
      ("b", 4), 
      ("b", 2),
      ("a", 6),
      ("b", 8)
    ))

Я объясняю, как это сделать шаг за шагом:

1.Группа по ключу для создания массива

scores.groupByKey().foreach(println)  

Результат:

(b,CompactBuffer(3, 1, 4, 2, 8))
(a,CompactBuffer(1, 2, 3, 4, 6))

Как вы видите, каждое значение само по себе является массивом чисел. CompactBuffer - это просто оптимизированный массив.

2. Для каждого ключа, обратный список сортировки чисел, которые содержат

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse)} ).foreach(println)

Результат:

(b,List(8, 4, 3, 2, 1))
(a,List(6, 4, 3, 2, 1))

3. Сохраните только первые 2 элемента с 2-го шага, они будут топ-2 баллы в списке

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)} ).foreach(println)

Результат:

(a,List(6, 4))
(b,List(8, 4))

4.Flat-карта для создания нового парного RDD для каждого ключа и верхнего балла

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)} ).flatMap({case (k, numbers) => numbers.map(k -> _)}).foreach(println)

Результат:

(b,8)
(b,4)
(a,6)
(a,4)

5.Optional step - сортировка по ключу, если вы хотите

scores.groupByKey().map({ case (k, numbers) => k -> numbers.toList.sorted(Ordering[Int].reverse).take(2)} ).flatMap({case (k, numbers) => numbers.map(k -> _)}).sortByKey(false).foreach(println)

Результат:

(a,6)
(a,4)
(b,8)
(b,4)

Надеюсь, это объяснение помогло понять логику.

Ответ 4

 scores.reduceByKey(_ + _).map(x => x._2 -> x._1).sortByKey(false).map(x => x._2 -> x._1).take(2).foreach(println)