Столбец String для Pyspark Dataframe
У меня есть простой DataFrame, как это:
rdd = sc.parallelize(
[
(0, "A", 223,"201603", "PORT"),
(0, "A", 22,"201602", "PORT"),
(0, "A", 422,"201601", "DOCK"),
(1,"B", 3213,"201602", "DOCK"),
(1,"B", 3213,"201601", "PORT"),
(2,"C", 2321,"201601", "DOCK")
]
)
df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])
df_data.show()
+---+----+----+------+----+
| id|type|cost| date|ship|
+---+----+----+------+----+
| 0| A| 223|201603|PORT|
| 0| A| 22|201602|PORT|
| 0| A| 422|201601|DOCK|
| 1| B|3213|201602|DOCK|
| 1| B|3213|201601|PORT|
| 2| C|2321|201601|DOCK|
+---+----+----+------+----+
и мне нужно повернуть его по дате:
df_data.groupby(df_data.id, df_data.type).pivot("date").avg("cost").show()
+---+----+------+------+------+
| id|type|201601|201602|201603|
+---+----+------+------+------+
| 2| C|2321.0| null| null|
| 0| A| 422.0| 22.0| 223.0|
| 1| B|3213.0|3213.0| null|
+---+----+------+------+------+
Все работает так, как ожидалось. Но теперь мне нужно свернуть его и получить нечисловой столбец:
df_data.groupby(df_data.id, df_data.type).pivot("date").avg("ship").show()
и, конечно, я получаю исключение:
AnalysisException: u'"ship" is not a numeric column. Aggregation function can only be applied on a numeric column.;'
Я хотел бы сгенерировать что-то в строке
+---+----+------+------+------+
| id|type|201601|201602|201603|
+---+----+------+------+------+
| 2| C|DOCK | null| null|
| 0| A| DOCK | PORT| DOCK|
| 1| B|DOCK |PORT | null|
+---+----+------+------+------+
Возможно ли это с помощью pivot
?
Ответы
Ответ 1
Предполагая, что комбинации (id |type | date)
уникальны, и ваша единственная цель - поворот, а не агрегация, вы можете использовать first
(или любую другую функцию, не ограниченную числовыми значениями):
from pyspark.sql.functions import first
(df_data
.groupby(df_data.id, df_data.type)
.pivot("date")
.agg(first("ship"))
.show())
## +---+----+------+------+------+
## | id|type|201601|201602|201603|
## +---+----+------+------+------+
## | 2| C| DOCK| null| null|
## | 0| A| DOCK| PORT| PORT|
## | 1| B| PORT| DOCK| null|
## +---+----+------+------+------+
Если эти допущения неверны, вам придется предварительно агрегировать свои данные. Например, для наиболее распространенного значения ship
:
from pyspark.sql.functions import max, struct
(df_data
.groupby("id", "type", "date", "ship")
.count()
.groupby("id", "type")
.pivot("date")
.agg(max(struct("count", "ship")))
.show())
## +---+----+--------+--------+--------+
## | id|type| 201601| 201602| 201603|
## +---+----+--------+--------+--------+
## | 2| C|[1,DOCK]| null| null|
## | 0| A|[1,DOCK]|[1,PORT]|[1,PORT]|
## | 1| B|[1,PORT]|[1,DOCK]| null|
## +---+----+--------+--------+--------+