Ответ 1
Первая проблема может быть решена с помощью типизированных столбцов вплоть до конца (KeyValueGroupedDataset.agg
ожидает TypedColumn(-s)
)
Вы можете определить результат агрегации как:
val eight = lit(8.0)
.as[Double] // Not necessary
val sumByEight = typedSum[MyClass](_.c4)
.divide(eight)
.as[Double] // Required
.name("div(sum(c4), 8)")
и подключите его к следующему коду:
val myCaseClass = Seq(
MyClass("a", "b", "c", 2.0),
MyClass("a", "b", "c", 3.0)
).toDS
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.agg(sumByEight)
чтобы получить
+-------+---------------+
| key|div(sum(c4), 8)|
+-------+---------------+
|[a,b,c]| 0.625|
+-------+---------------+
Вторая проблема - результат использования класса, который не соответствует форме данных. Правильное представление может быть:
case class AnotherClass(key: (String, String, String), sum: Double)
который используется с данными, определенными выше:
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.agg(typedSum[MyClass](_.c4).name("sum"))
.as[AnotherClass]
даст:
+-------+---+
| key|sum|
+-------+---+
|[a,b,c]|5.0|
+-------+---+
но .as[AnotherClass]
здесь не требуется, если Dataset[((String, String, String), Double)]
является приемлемым.
Конечно, вы можете пропустить все это и просто mapGroups
(хотя и не без снижения производительности):
import shapeless.syntax.std.tuple._ // A little bit of shapeless
val tuples = myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.mapGroups((group, iter) => group :+ iter.map(_.c4).sum)
с результатом
+---+---+---+---+
| _1| _2| _3| _4|
+---+---+---+---+
| a| b| c|5.0|
+---+---+---+---+
reduceGroups
может быть лучшим вариантом:
myCaseClass
.groupByKey(myCaseClass => (myCaseClass.c1, myCaseClass.c2, myCaseClass.c3))
.reduceGroups((x, y) => x.copy(c4=x.c4 + y.c4))
с результирующим Dataset
:
+-------+-----------+
| _1| _2|
+-------+-----------+
|[a,b,c]|[a,b,c,5.0]|
+-------+-----------+