Ответ 1
Если вы конвертируете в DataFrame, все это становится намного проще - вы можете просто самостоятельно присоединить данные к себе и найти среднее. Скажем, у меня есть ряд данных вроде этого:
tsDF.show
date amount
1970-01-01 10.0
1970-01-01 5.0
1970-01-01 7.0
1970-01-02 14.0
1970-01-02 13.9
1970-01-03 1.0
1970-01-03 5.0
1970-01-03 9.0
1970-01-04 9.0
1970-01-04 5.8
1970-01-04 2.8
1970-01-04 8.9
1970-01-05 8.1
1970-01-05 2.1
1970-01-05 2.78
1970-01-05 20.78
Который работает как:
tsDF.groupBy($"date").agg($"date", sum($"amount"), count($"date")).show
date SUM(amount) COUNT(date)
1970-01-01 22.0 3
1970-01-02 27.9 2
1970-01-03 15.0 3
1970-01-04 26.5 4
1970-01-05 33.76 4
Затем мне нужно будет создать UDF для сдвига даты для условия соединения (обратите внимание, что я использую только 2-дневное окно с помощью offset = -2):
def dateShift(myDate: java.sql.Date): java.sql.Date = {
val offset = -2;
val cal = Calendar.getInstance;
cal.setTime(myDate);
cal.add(Calendar.DATE, offset);
new java.sql.Date(cal.getTime.getTime)
}
val udfDateShift = udf[java.sql.Date,java.sql.Date](dateShift)
И тогда я мог бы легко найти двухдневный скользящий средний:
val windowDF = tsDF.select($"date")
.groupBy($"date")
.agg($"date")
.join(
tsDF.select($"date" as "r_date", $"amount" as "r_amount"),
$"r_date" > udfDateShift($"date") and $"r_date" <= $"date"
)
.groupBy($"date")
.agg($"date",avg($"r_amount") as "2 day avg amount / record")
val windowDF.show
date 2 day avg amount / record
1970-01-01 7.333333333333333
1970-01-02 9.98
1970-01-03 8.58
1970-01-04 5.928571428571429
1970-01-05 7.5325
Пока это не совсем то, что вы пытались сделать, вы видите, как вы можете использовать самосоединение DataFrame для извлечения текущих средних из набора данных. Надеюсь, вы нашли это полезным.