Ответ 1
Вы можете использовать функцию окна lag
следующим образом
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window
df = sc.parallelize([(4, 9.0), (3, 7.0), (2, 3.0), (1, 5.0)]).toDF(["id", "num"])
w = Window().partitionBy().orderBy(col("id"))
df.select("*", lag("num").over(w).alias("new_col")).na.drop().show()
## +---+---+-------+
## | id|num|new_col|
## +---+---+-------|
## | 2|3.0| 5.0|
## | 3|7.0| 3.0|
## | 4|9.0| 7.0|
## +---+---+-------+
но есть некоторые важные проблемы:
- Если вам нужна глобальная операция (не секционированная каким-либо другим столбцом/столбцами), она крайне неэффективна.
- Вам нужен естественный способ заказать ваши данные.
В то время как вторая проблема почти никогда не является проблемой, первая из них может быть разрывом. Если это так, вы должны просто преобразовать ваш DataFrame
в RDD и вычислить lag
вручную. См. Например:
- Как преобразовать данные со скользящим окном по данным временных рядов в Pyspark
- Apache Spark Moving Average (написано в Scala, но может быть настроено для PySpark. Обязательно сначала прочитайте комментарии).
Другие полезные ссылки: