Эффективный подсчет с Apache Spark
100 миллионов клиентов кликают 100 миллиардов раз на страницах нескольких веб-сайтов (скажем, 100 сайтов). И поток кликов доступен вам в большом наборе данных.
Используя абстракции Apache Spark, каков наиболее эффективный способ подсчета отдельных посетителей на веб-сайт?
Ответы
Ответ 1
visitors.distinct().count()
было бы очевидным путем, с первого раза в отдельности вы можете указать уровень parallelism, а также увидеть улучшение скорости. Если можно настроить посетителей как поток и использовать D-потоки, это будет делать счет в режиме реального времени. Вы можете напрямую передавать данные из каталога и использовать те же методы, что и в RDD, например:
val file = ssc.textFileStream("...")
file.distinct().count()
Последний вариант - использовать def countApproxDistinct(relativeSD: Double = 0.05): Long
, но это помечено как экспериментальное, но будет значительно быстрее, чем считать, если relativeSD
(отклонение std) выше.
РЕДАКТИРОВАТЬ:. Поскольку вы хотите, чтобы количество на веб-сайт можно было просто уменьшить на идентификаторе веб-сайта, это можно сделать эффективно (с помощью комбинаторов), поскольку счет является совокупным. Если у вас есть идентификатор корневого каталога идентификатора сайта, вы можете сделать это.
visitors.countDistinctByKey()
или visitors.countApproxDistinctByKey()
, еще один примерный - экспериментальный. Чтобы использовать приблизительный ключ, вам нужно PairRDD
Интересная сторона, если вы в порядке с приближениями и хотите получить быстрые результаты, вы можете посмотреть в blinkDB, сделанные теми же людьми, что и лаборатории искрового усилителя.
Ответ 2
Мне приходилось делать подобные вещи, одна вещь эффективности, которую вы можете сделать (это не настоящая искра) - сопоставить ваши идентификаторы vistor спискам байтов, а не GUID Strings, вы можете сохранить 4-х пространство тогда (в виде 2 символов представляет собой шестнадцатеричное кодирование одного байта, а Char использует 2 байта в String).
// Inventing these custom types purely for this question - don't do this in real life!
type VistorID = List[Byte]
type WebsiteID = Int
val visitors: RDD[(WebsiteID, VisitorID)] = ???
visitors.distinct().mapValues(_ => 1).reduceByKey(_ + _)
Заметьте, что вы также можете сделать:
visitors.distinct().map(_._1).countByValue()
но это также не масштабируется.
Ответ 3
Я заметил, что основная отличительная функция может быть значительно быстрее, когда вы запускаете ее на RDD, чем запускаете ее в коллекции DataFrame. Например:
DataFrame df = sqlContext.load(...)
df.distinct.count // 0.8 s
df.rdd.distinct.count // 0.2 s
Ответ 4
Если data
является RDD
пар (сайт, посетитель), то data.countApproxDistinctByKey(0.05)
предоставит вам RDD
of (site, count). Параметр можно уменьшить, чтобы получить большую точность за счет большей обработки.
Ответ 5
Spark 2.0 добавил ApproxCountDistinct в dataframe и SQL API:
https://databricks.com/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles.html
https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html#approxCountDistinct(org.apache.spark.sql.Column)
Ответ 6
Если вы хотите его на веб-страницу, то visitors.distinct()...
неэффективен. Если у вас много посетителей и много веб-страниц, вы выделяете огромное количество комбинаций (webpage, visitor)
, которые могут перегружать память.
Вот еще один способ:
visitors.groupByKey().map {
case (webpage, visitor_iterable)
=> (webpage, visitor_iterable.toArray.distinct.length)
}
Это требует, чтобы посетители одной веб-страницы помещались в память, поэтому во всех случаях это может быть не лучшим.