Использование reduceByKey в Apache Spark (Scala)
У меня есть список Tuples типа: (идентификатор пользователя, имя, счет).
Например,
val x = sc.parallelize(List(
("a", "b", 1),
("a", "b", 1),
("c", "b", 1),
("a", "d", 1))
)
Я пытаюсь уменьшить эту коллекцию до типа, в котором каждый
имя элемента подсчитывается.
Итак, в приведенном выше val x преобразуется в:
(a,ArrayBuffer((d,1), (b,2)))
(c,ArrayBuffer((b,1)))
Вот код, который я использую в настоящее время:
val byKey = x.map({case (id,uri,count) => (id,uri)->count})
val grouped = byKey.groupByKey
val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))}
val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey
grouped2.foreach(println)
Я пытаюсь использовать reduceByKey, поскольку он работает быстрее, чем groupByKey.
Как можно уменьшитьByKey вместо кода выше, чтобы обеспечить
то же отображение?
Ответы
Ответ 1
Следуя вашему коду:
val byKey = x.map({case (id,uri,count) => (id,uri)->count})
Вы можете сделать:
val reducedByKey = byKey.reduceByKey(_ + _)
scala> reducedByKey.collect.foreach(println)
((a,d),1)
((a,b),2)
((c,b),1)
PairRDDFunctions[K,V].reduceByKey
принимает ассоциативную функцию сокращения, которая может быть применена к типу V RDD [(K, V)]. Другими словами, вам нужна функция f[V](e1:V, e2:V) : V
. В этом конкретном случае с суммой на Ints: (x:Int, y:Int) => x+y
или _ + _
в короткой форме подчеркивания.
Для записи: reduceByKey
работает лучше, чем groupByKey
, потому что он должен применять функцию уменьшения локально до фазы перетасовки/уменьшения. groupByKey
заставит перетасовать все элементы перед группировкой.
Ответ 2
Структура источника данных: RDD [(String, String, Int)] и reduceByKey
может использоваться только в том случае, если структура данных RDD [(K, V)].
val kv = x.map(e => e._1 -> e._2 -> e._3) // kv is RDD[((String, String), Int)]
val reduced = kv.reduceByKey(_ + _) // reduced is RDD[((String, String), Int)]
val kv2 = reduced.map(e => e._1._1 -> (e._1._2 -> e._2)) // kv2 is RDD[(String, (String, Int))]
val grouped = kv2.groupByKey() // grouped is RDD[(String, Iterable[(String, Int)])]
grouped.foreach(println)
Ответ 3
Синтаксис ниже:
reduceByKey(func: Function2[V, V, V]): JavaPairRDD[K, V],
который говорит, что для того же ключа в RDD он принимает значения (которые будут определенно одного типа) выполняет операцию, предоставляемую как часть функции, и возвращает значение того же типа, что и для родительского RDD.