Вычислить стандартное отклонение сгруппированных данных в Spark DataFrame
У меня есть журналы пользователей, которые я взял из csv и преобразован в DataFrame, чтобы использовать функции запросов SparkSQL. Один пользователь будет создавать многочисленные записи в час, и я хотел бы собрать некоторую базовую статистическую информацию для каждого пользователя; на самом деле просто количество экземпляров пользователя, среднее и стандартное отклонение многочисленных столбцов. Я смог быстро получить информацию о среднем и подсчете, используя groupBy ($ "пользователь" ) и агрегатор с функциями SparkSQL для count и avg:
val meanData = selectedData.groupBy($"user").agg(count($"logOn"),
avg($"transaction"), avg($"submit"), avg($"submitsPerHour"), avg($"replies"),
avg($"repliesPerHour"), avg($"duration"))
Однако я не могу найти столь же элегантный способ вычисления стандартного отклонения. Пока я могу вычислить его только путем сопоставления строки, двойной пары и использования StatCounter(). Утилита stdev:
val stdevduration = duration.groupByKey().mapValues(value =>
org.apache.spark.util.StatCounter(value).stdev)
Однако это возвращает RDD, и я хотел бы попытаться сохранить все это в DataFrame, чтобы дальнейшие запросы были возможны по возвращенным данным.
Ответы
Ответ 1
Искра 1.6 +
Вы можете использовать stddev_pop
для вычисления стандартного отклонения населения и stddev
/stddev_samp
для вычисления несмещенного стандартного отклонения выборки:
import org.apache.spark.sql.functions.{stddev_samp, stddev_pop}
selectedData.groupBy($"user").agg(stdev_pop($"duration"))
Искра 1.5 и ниже (оригинальный ответ):
Не так красиво и смещенно (так же, как и значение, возвращаемое из describe
), но используя формулу:
![wikipedia sdev]()
вы можете сделать что-то вроде этого:
import org.apache.spark.sql.functions.sqrt
selectedData
.groupBy($"user")
.agg((sqrt(
avg($"duration" * $"duration") -
avg($"duration") * avg($"duration")
)).alias("duration_sd"))
Вы можете, конечно, создать функцию для уменьшения беспорядка:
import org.apache.spark.sql.Column
def mySd(col: Column): Column = {
sqrt(avg(col * col) - avg(col) * avg(col))
}
df.groupBy($"user").agg(mySd($"duration").alias("duration_sd"))
Можно также использовать Hive UDF:
df.registerTempTable("df")
sqlContext.sql("""SELECT user, stddev(duration)
FROM df
GROUP BY user""")
Источник изображения: https://en.wikipedia.org/wiki/Standard_deviation
Ответ 2
Принятый код не компилируется, так как содержит опечатку (как указал MRez). Фрагмент ниже работает и проверяется.
Для Spark 2. 0+ :
import org.apache.spark.sql.functions._
val _avg_std = df.groupBy("user").agg(
avg(col("duration").alias("avg")),
stddev(col("duration").alias("stdev")),
stddev_pop(col("duration").alias("stdev_pop")),
stddev_samp(col("duration").alias("stdev_samp"))
)