Прокрутка собственного сокращенияByKey в Spark Dataset

Я пытаюсь научиться использовать DataFrames и DataSets в дополнение к RDD. Для RDD я знаю, что могу сделать someRDD.reduceByKey((x,y) => x + y), но я не вижу эту функцию для Dataset. Поэтому я решил написать одно.

someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => {
  val result = mutable.HashMap.empty[(Long,Long),Int]
  val keys = mutable.HashSet.empty[(Long,Long)]
  y.keys.foreach(z => keys += z)
  x.keys.foreach(z => keys += z)
  for (elem <- keys) {
    val s1 = if(x.contains(elem)) x(elem) else 0
    val s2 = if(y.contains(elem)) y(elem) else 0
    result(elem) = s1 + s2
  }
  result
})

Однако это возвращает все драйверу. Как вы могли бы написать это, чтобы вернуть a Dataset? Может быть, mapPartition и сделать это там?

Обратите внимание, что это компилируется, но не запускается, потому что у него нет кодеров для Map еще

Ответы

Ответ 1

Я предполагаю, что ваша цель - перевести эту идиому на наборы данных:

rdd.map(x => (x.someKey, x.someField))
   .reduceByKey(_ + _)

// => returning an RDD of (KeyType, FieldType)

В настоящее время самое близкое решение, которое я нашел с API-интерфейсом Dataset, выглядит следующим образом:

ds.map(x => (x.someKey, x.someField))          // [1]
  .groupByKey(_._1)                            
  .reduceGroups((a, b) => (a._1, a._2 + b._2))
  .map(_._2)                                   // [2]

// => returning a Dataset of (KeyType, FieldType)

// Comments:
// [1] As far as I can see, having a map before groupByKey is required
//     to end up with the proper type in reduceGroups. After all, we do
//     not want to reduce over the original type, but the FieldType.
// [2] required since reduceGroups converts back to Dataset[(K, V)]
//     not knowing that our V are already key-value pairs.

Не выглядит очень элегантно, и, согласно быстрому эталону, он также намного менее эффективен, поэтому, возможно, мы здесь что-то не хватает...

Примечание. Альтернативой может быть использование groupByKey(_.someKey) в качестве первого шага. Проблема в том, что использование groupByKey изменяет тип с регулярного Dataset на KeyValueGroupedDataset. Последний не имеет регулярной функции map. Вместо этого он предлагает mapGroups, который не кажется очень удобным, потому что он переносит значения в Iterator и выполняет тасование в соответствии с docstring.

Ответ 2

Более эффективное решение использует mapPartitions до groupByKey, чтобы уменьшить количество перетасовки (обратите внимание, что это не та же самая подпись, что и reduceByKey, но я считаю, что более гибко передавать функцию, чем требовать набора данных кортежа).

def reduceByKey[V: ClassTag, K](ds: Dataset[V], f: V => K, g: (V, V) => V)
  (implicit encK: Encoder[K], encV: Encoder[V]): Dataset[(K, V)] = {
  def h[V: ClassTag, K](f: V => K, g: (V, V) => V, iter: Iterator[V]): Iterator[V] = {
    iter.toArray.groupBy(f).mapValues(_.reduce(g)).map(_._2).toIterator
  }
  ds.mapPartitions(h(f, g, _))
    .groupByKey(f)(encK)
    .reduceGroups(g)
}

В зависимости от формы/размера ваших данных это составляет 1 секунду от производительности reduceByKey и около 2x с точностью . По-прежнему есть возможности для улучшения, поэтому предложения будут приветствоваться.