Ответ 1
Это прекрасный пример для оконных операторов (с использованием функции over
) или join
.
Поскольку вы уже поняли, как использовать окна, я сосредоточен исключительно на join
.
scala> val inventory = Seq(
| ("a", "pref1", "b", 168),
| ("a", "pref3", "h", 168),
| ("a", "pref3", "t", 63)).toDF("uid", "k", "v", "count")
inventory: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 2 more fields]
scala> val maxCount = inventory.groupBy("uid", "k").max("count")
maxCount: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 1 more field]
scala> maxCount.show
+---+-----+----------+
|uid| k|max(count)|
+---+-----+----------+
| a|pref3| 168|
| a|pref1| 168|
+---+-----+----------+
scala> val maxCount = inventory.groupBy("uid", "k").agg(max("count") as "max")
maxCount: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 1 more field]
scala> maxCount.show
+---+-----+---+
|uid| k|max|
+---+-----+---+
| a|pref3|168|
| a|pref1|168|
+---+-----+---+
scala> maxCount.join(inventory, Seq("uid", "k")).where($"max" === $"count").show
+---+-----+---+---+-----+
|uid| k|max| v|count|
+---+-----+---+---+-----+
| a|pref3|168| h| 168|
| a|pref1|168| b| 168|
+---+-----+---+---+-----+