Pyspark: скользящее среднее с использованием данных таймсерия
У меня есть набор данных, состоящий из столбца временной метки и столбца доллара. Я хотел бы найти среднее количество долларов в неделю, заканчивающееся на отметке времени каждой строки. Сначала я смотрел на функцию pyspark.sql.functions.window, но это заставляет данные за неделю.
Вот пример:
%pyspark
import datetime
from pyspark.sql import functions as F
df1 = sc.parallelize([(17,"2017-03-11T15:27:18+00:00"), (13,"2017-03-11T12:27:18+00:00"), (21,"2017-03-17T11:27:18+00:00")]).toDF(["dollars", "datestring"])
df2 = df1.withColumn('timestampGMT', df1.datestring.cast('timestamp'))
w = df2.groupBy(F.window("timestampGMT", "7 days")).agg(F.avg("dollars").alias('avg'))
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "avg").collect()
Это приводит к двум записям:
| start | end | avg |
|---------------------|----------------------|-----|
|'2017-03-16 00:00:00'| '2017-03-23 00:00:00'| 21.0|
|---------------------|----------------------|-----|
|'2017-03-09 00:00:00'| '2017-03-16 00:00:00'| 15.0|
|---------------------|----------------------|-----|
Функция окна связывает данные временных рядов, а не выполняет скользящее среднее.
Есть ли способ выполнить скользящее среднее, когда я вернусь к недельной средней для каждой строки с периодом времени, заканчивающимся на timestampGMT строки?
EDIT:
Ответ Чжан ниже близок к тому, что я хочу, но не совсем то, что я хотел бы видеть.
Вот лучший пример, чтобы показать, к чему я пытаюсь добраться:
%pyspark
from pyspark.sql import functions as F
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
(13, "2017-03-15T12:27:18+00:00"),
(25, "2017-03-18T11:27:18+00:00")],
["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
df = df.withColumn('rolling_average', F.avg("dollars").over(Window.partitionBy(F.window("timestampGMT", "7 days"))))
Это приводит к следующему файловому кадру:
dollars timestampGMT rolling_average
25 2017-03-18 11:27:18.0 25
17 2017-03-10 15:27:18.0 15
13 2017-03-15 12:27:18.0 15
Я хотел бы, чтобы среднее значение было в течение недели, продолжая дату в столбце timestampGMT, что приведет к следующему:
dollars timestampGMT rolling_average
17 2017-03-10 15:27:18.0 17
13 2017-03-15 12:27:18.0 15
25 2017-03-18 11:27:18.0 19
В приведенных выше результатах значение roll_average для 2017-03-10 равно 17, так как предшествующих записей нет. Средство roll_average для 2017-03-15 равно 15, потому что оно усредняет 13 с 2017-03-15 и 17 с 2017-03-10, которое падает с предыдущим 7-дневным окном. Скользящий средний показатель для 2017-03-18 составляет 19, поскольку он усредняет 25 с 2017-03-18 и 13 с 2017-03-10, который падает с предыдущим 7-дневным окном, и он не включает 17 от 2017 года -03-10, потому что это не падает с предыдущим 7-дневным окном.
Есть ли способ сделать это, а не окно binning, где еженедельные окна не перекрываются?
Ответы
Ответ 1
Я выяснил, как правильно рассчитать скользящее/скользящее среднее, используя этот стекопоток:
Функции Spark Window - диапазон между датами
Основная идея состоит в том, чтобы преобразовать ваш столбец отметки времени в секунды, а затем вы можете использовать функцию rangeBetween в классе pyspark.sql.Window, чтобы включить правильные строки в ваше окно.
Вот решенный пример:
%pyspark
from pyspark.sql import functions as F
from pyspark.sql.window import Window
#function to calculate number of seconds from number of days
days = lambda i: i * 86400
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
(13, "2017-03-15T12:27:18+00:00"),
(25, "2017-03-18T11:27:18+00:00")],
["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))
df = df.withColumn('rolling_average', F.avg("dollars").over(w))
Это приводит к точному столбцу скользящих средних, который я искал:
dollars timestampGMT rolling_average
17 2017-03-10 15:27:18.0 17.0
13 2017-03-15 12:27:18.0 15.0
25 2017-03-18 11:27:18.0 19.0
Ответ 2
Я добавлю вариант, который я лично нашел очень полезным. Я надеюсь, что кто-то найдет это полезным:
Если вы хотите группировать, то в соответствующих группах вычислите скользящее среднее:
Пример кадра данных:
from pyspark.sql.window import Window
from pyspark.sql import functions as func
df = spark.createDataFrame([("tshilidzi", 17.00, "2018-03-10T15:27:18+00:00"),
("tshilidzi", 13.00, "2018-03-11T12:27:18+00:00"),
("tshilidzi", 25.00, "2018-03-12T11:27:18+00:00"),
("thabo", 20.00, "2018-03-13T15:27:18+00:00"),
("thabo", 56.00, "2018-03-14T12:27:18+00:00"),
("thabo", 99.00, "2018-03-15T11:27:18+00:00"),
("tshilidzi", 156.00, "2019-03-22T11:27:18+00:00"),
("thabo", 122.00, "2018-03-31T11:27:18+00:00"),
("tshilidzi", 7000.00, "2019-04-15T11:27:18+00:00"),
("ash", 9999.00, "2018-04-16T11:27:18+00:00")
],
["name", "dollars", "timestampGMT"])
# we need this timestampGMT as seconds for our Window time frame
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
df.show(10000, False)
Выход:
+---------+-------+---------------------+
|name |dollars|timestampGMT |
+---------+-------+---------------------+
|tshilidzi|17.0 |2018-03-10 17:27:18.0|
|tshilidzi|13.0 |2018-03-11 14:27:18.0|
|tshilidzi|25.0 |2018-03-12 13:27:18.0|
|thabo |20.0 |2018-03-13 17:27:18.0|
|thabo |56.0 |2018-03-14 14:27:18.0|
|thabo |99.0 |2018-03-15 13:27:18.0|
|tshilidzi|156.0 |2019-03-22 13:27:18.0|
|thabo |122.0 |2018-03-31 13:27:18.0|
|tshilidzi|7000.0 |2019-04-15 13:27:18.0|
|ash |9999.0 |2018-04-16 13:27:18.0|
+---------+-------+---------------------+
Чтобы рассчитать скользящее среднее на основе name
и сохранить все строки:
#create window by casting timestamp to long (number of seconds)
w = (Window()
.partitionBy(col("name"))
.orderBy(F.col("timestampGMT").cast('long'))
.rangeBetween(-days(7), 0))
df2 = df.withColumn('rolling_average', F.avg("dollars").over(w))
df2.show(100, False)
Выход:
+---------+-------+---------------------+------------------+
|name |dollars|timestampGMT |rolling_average |
+---------+-------+---------------------+------------------+
|ash |9999.0 |2018-04-16 13:27:18.0|9999.0 |
|tshilidzi|17.0 |2018-03-10 17:27:18.0|17.0 |
|tshilidzi|13.0 |2018-03-11 14:27:18.0|15.0 |
|tshilidzi|25.0 |2018-03-12 13:27:18.0|18.333333333333332|
|tshilidzi|156.0 |2019-03-22 13:27:18.0|156.0 |
|tshilidzi|7000.0 |2019-04-15 13:27:18.0|7000.0 |
|thabo |20.0 |2018-03-13 17:27:18.0|20.0 |
|thabo |56.0 |2018-03-14 14:27:18.0|38.0 |
|thabo |99.0 |2018-03-15 13:27:18.0|58.333333333333336|
|thabo |122.0 |2018-03-31 13:27:18.0|122.0 |
+---------+-------+---------------------+------------------+
Ответ 3
Вы имеете в виду это:
df = spark.createDataFrame([(17, "2017-03-11T15:27:18+00:00"),
(13, "2017-03-11T12:27:18+00:00"),
(21, "2017-03-17T11:27:18+00:00")],
["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
df = df.withColumn('rolling_average', f.avg("dollars").over(Window.partitionBy(f.window("timestampGMT", "7 days"))))
Вывод:
+-------+-------------------+---------------+
|dollars|timestampGMT |rolling_average|
+-------+-------------------+---------------+
|21 |2017-03-17 19:27:18|21.0 |
|17 |2017-03-11 23:27:18|15.0 |
|13 |2017-03-11 20:27:18|15.0 |
+-------+-------------------+---------------+
Ответ 4
Стоит отметить, что если вы не заботитесь о точных датах, но хотите получить среднее значение за последние 30 дней, вы можете использовать функцию rowBetween следующим образом:
w = Window.orderBy('timestampGMT').rowsBetween(-7, 0)
df = eurPrices.withColumn('rolling_average', F.avg('dollars').over(w))
Так как вы заказываете по датам, это займет последние 7 случаев. Вы сохраняете все кастинги.