Ответ 1
Решение в вопросе - лучшее, что я мог получить. Единственным улучшением будет cache
входного набора данных, чтобы избежать двойного сканирования, т.е.
mydf.cache
pivot_udf(mydf,'price','units').show()
Мне нужно развернуть более одного столбца в фреймворке pyspark. Пример кадра данных,
>>> d = [(100,1,23,10),(100,2,45,11),(100,3,67,12),(100,4,78,13),(101,1,23,10),(101,2,45,13),(101,3,67,14),(101,4,78,15),(102,1,23,10),(102,2,45,11),(102,3,67,16),(102,4,78,18)]
>>> mydf = spark.createDataFrame(d,['id','day','price','units'])
>>> mydf.show()
+---+---+-----+-----+
| id|day|price|units|
+---+---+-----+-----+
|100| 1| 23| 10|
|100| 2| 45| 11|
|100| 3| 67| 12|
|100| 4| 78| 13|
|101| 1| 23| 10|
|101| 2| 45| 13|
|101| 3| 67| 14|
|101| 4| 78| 15|
|102| 1| 23| 10|
|102| 2| 45| 11|
|102| 3| 67| 16|
|102| 4| 78| 18|
+---+---+-----+-----+
Теперь, если мне нужно получить столбец цены в строке для каждого идентификатора на основе дня, тогда я могу использовать метод pivot as,
>>> pvtdf = mydf.withColumn('combcol',F.concat(F.lit('price_'),mydf['day'])).groupby('id').pivot('combcol').agg(F.first('price'))
>>> pvtdf.show()
+---+-------+-------+-------+-------+
| id|price_1|price_2|price_3|price_4|
+---+-------+-------+-------+-------+
|100| 23| 45| 67| 78|
|101| 23| 45| 67| 78|
|102| 23| 45| 67| 78|
+---+-------+-------+-------+-------+
поэтому, когда мне нужен столбцы единиц, а также для переноса в качестве цены, либо мне нужно создать еще один фрейм данных, как указано выше для единиц, а затем присоединиться к использованию с использованием id.But, когда у меня больше столбцов как таковых, я попробовал функцию для этого,
>>> def pivot_udf(df,*cols):
... mydf = df.select('id').drop_duplicates()
... for c in cols:
... mydf = mydf.join(df.withColumn('combcol',F.concat(F.lit('{}_'.format(c)),df['day'])).groupby('id').pivot('combcol').agg(F.first(c)),'id')
... return mydf
...
>>> pivot_udf(mydf,'price','units').show()
+---+-------+-------+-------+-------+-------+-------+-------+-------+
| id|price_1|price_2|price_3|price_4|units_1|units_2|units_3|units_4|
+---+-------+-------+-------+-------+-------+-------+-------+-------+
|100| 23| 45| 67| 78| 10| 11| 12| 13|
|101| 23| 45| 67| 78| 10| 13| 14| 15|
|102| 23| 45| 67| 78| 10| 11| 16| 18|
+---+-------+-------+-------+-------+-------+-------+-------+-------+
Нуждайтесь в предложениях, если это хорошая практика, и если любой другой лучший способ это сделать. Спасибо заранее!
Решение в вопросе - лучшее, что я мог получить. Единственным улучшением будет cache
входного набора данных, чтобы избежать двойного сканирования, т.е.
mydf.cache
pivot_udf(mydf,'price','units').show()
Как и в версии с искровым 1.6, я считаю, что единственный способ, потому что pivot занимает только один столбец, и есть второе значение атрибута, на котором вы можете передать различные значения этого столбца, которые заставят ваш код работать быстрее, потому что в противном случае искра должна запускаться это для вас, так что да, это правильный способ сделать это.
Здесь не-UDF-способ, включающий одну сводную точку (следовательно, просто сканирование одного столбца для определения всех уникальных дат).
mydf.groupBy('id').pivot('day').agg(F.first('price').alias('price'),F.first('units').alias('unit'))
Вот результат (извинения за несоответствующий порядок и наименование):
+---+-------+------+-------+------+-------+------+-------+------+
| id|1_price|1_unit|2_price|2_unit|3_price|3_unit|4_price|4_unit|
+---+-------+------+-------+------+-------+------+-------+------+
|100| 23| 10| 45| 11| 67| 12| 78| 13|
|101| 23| 10| 45| 13| 67| 14| 78| 15|
|102| 23| 10| 45| 11| 67| 16| 78| 18|
+---+-------+------+-------+------+-------+------+-------+------+
Мы просто агрегируем как по price
и по столбцу unit
после разворота в день.
Я думаю, что вы можете создать объединенный столбец (объединяющий все столбцы, к которым нужно повернуть) и использовать сводку к объединенному столбцу.