Как получить эквивалент row_number SQL для Spark RDD?
Мне нужно создать полный список row_numbers для таблицы данных со многими столбцами.
В SQL это будет выглядеть так:
select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;
Теперь скажем, что в Spark у меня есть RDD формы (K, V), где V = (col1, col2, col3), поэтому мои записи похожи на
(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
Я хочу заказать их с помощью таких команд, как sortBy(), sortWith(), sortByKey(), zipWithIndex и т.д., и иметь новый RDD с правильным номером_строки
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
(меня не интересуют круглые скобки, поэтому форма также может быть (K, (col1, col2, col3, rownum)))
Как это сделать?
Здесь моя первая попытка:
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))
val temp1 = sc.parallelize(sample_data)
temp1.collect().foreach(println)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)
temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)
// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)
// note that this isn't ordering with a partition on key value K!
val temp2 = temp1.???
Также обратите внимание, что функция sortBy не может быть применена непосредственно к RDD, но сначала нужно запустить collect(), а затем вывод не является RDD, но массив
temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)
// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
Здесь немного больше прогресса, но он еще не разбит на разделы:
val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))
temp2.collect().foreach(println)
// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)
Ответы
Ответ 1
Функциональность row_number() over (partition by ... order by ...)
была добавлена в Spark 1.4. Этот ответ использует PySpark/DataFrames.
Создать тест DataFrame:
from pyspark.sql import Row, functions as F
testDF = sc.parallelize(
(Row(k="key1", v=(1,2,3)),
Row(k="key1", v=(1,4,7)),
Row(k="key1", v=(2,2,3)),
Row(k="key2", v=(5,5,5)),
Row(k="key2", v=(5,5,9)),
Row(k="key2", v=(7,5,5))
)
).toDF()
Добавьте номер разделенной строки:
from pyspark.sql.window import Window
(testDF
.select("k", "v",
F.rowNumber()
.over(Window
.partitionBy("k")
.orderBy("k")
)
.alias("rowNum")
)
.show()
)
+----+-------+------+
| k| v|rowNum|
+----+-------+------+
|key1|[1,2,3]| 1|
|key1|[1,4,7]| 2|
|key1|[2,2,3]| 3|
|key2|[5,5,5]| 1|
|key2|[5,5,9]| 2|
|key2|[7,5,5]| 3|
+----+-------+------+
Ответ 2
Это интересная проблема, которую вы поднимаете. Я отвечу на Python, но я уверен, что вы сможете легко перевести на Scala.
Вот как я бы справился с этим:
1- Упрощение ваших данных:
temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))
temp2 теперь является "реальной" парой ключ-значение. Это выглядит так:
[
((3, 4), (5, 5, 5)),
((3, 4), (5, 5, 9)),
((3, 4), (7, 5, 5)),
((1, 2), (1, 2, 3)),
((1, 2), (1, 4, 7)),
((1, 2), (2, 2, 3))
]
2- Затем используйте групповую функцию для воспроизведения эффекта PARTITION BY:
temp3 = temp2.groupByKey()
temp3 теперь представляет собой RDD с двумя строками:
[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),
((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]
3 Теперь вам нужно применить ранговую функцию для каждого значения RDD. В python я бы использовал простую сортированную функцию (перечисление создаст ваш столбец row_number):
temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)
Обратите внимание, что для реализации вашего конкретного порядка вам нужно будет подать правильный аргумент "ключ" (в python я бы просто создал лямбда-функцию, такую как:
lambda tuple : (tuple[0],-tuple[1],tuple[2])
В конце (без функции ключевого аргумента это выглядит так):
[
((1, 2), ((1, 2, 3), 0)),
((1, 2), ((1, 4, 7), 1)),
((1, 2), ((2, 2, 3), 2)),
((3, 4), ((5, 5, 5), 0)),
((3, 4), ((5, 5, 9), 1)),
((3, 4), ((7, 5, 5), 2))
]
Надеюсь, что это поможет!
Удачи.