Как найти медианный и квантиль с использованием Spark
Как найти медиану целых чисел RDD
, используя распределенный метод, IPython и Spark? RDD
составляет приблизительно 700 000 элементов и поэтому слишком велик для сбора и поиска медианы.
Этот вопрос похож на этот вопрос. Однако ответ на вопрос: Scala, о котором я не знаю.
Как рассчитать точную медианную с Apache Spark?
Используя мышление для ответа Scala, я пытаюсь написать аналогичный ответ в Python.
Я знаю, что сначала хочу отсортировать RDD
. Я не знаю как. Я вижу sortBy
(Сортирует этот RDD по данным keyfunc
) и sortByKey
(Сортирует этот RDD
, который предполагается состоять из (пары, значение).). Я думаю, что оба используют ключевое значение, а мой RDD
имеет только целые элементы.
- Во-первых, я думал делать
myrdd.sortBy(lambda x: x)
?
- Далее я найду длину rdd (
rdd.count()
).
- Наконец, я хочу найти элемент или 2 элемента в центре rdd. Мне также нужна помощь в этом методе.
EDIT:
У меня была идея. Может быть, я могу индексировать мой RDD
, а затем key = index и value = element. И тогда я могу попытаться сортировать по значению? Я не знаю, возможно ли это, потому что существует только метод sortByKey
.
Ответы
Ответ 1
Spark 2.0 +:
Вы можете использовать метод approxQuantile
, который реализует Алгоритм Гринвальда-Ханны:
Python
df.approxQuantile( "x" , [0.5], 0.25)
Код>
Scala
df.stat.approxQuantile( "x" , массив (0,5), 0,25)
Код>
где последний параметр является относительной ошибкой. Чем ниже число, тем точнее результаты и более дорогостоящие вычисления.
Так как Spark 2.2 ( SPARK-14352), он поддерживает оценку на нескольких столбцах
df.approxQuantile([ "x" , "y", "z" ], [0.5], 0.25)
Код>
и
df.approxQuantile(Array ( "x" , "y", "z" ), Array (0.5), 0.25)
Код>
Spark & lt; 2.0
Python
Как я уже упоминал в комментариях, это, скорее всего, не стоит всей суеты. Если данные относительно малы, как в вашем случае, тогда просто собирайте и вычисляйте медианную локально:
импортировать numpy как np
np.random.seed(323)
rdd = sc.parallelize(np.random.randint(1000000, размер = 700000))
% времени np.median(rdd.collect())
np.array(rdd.collect()). число-байт
Код>
Это занимает около 0,01 секунды на моем компьютере с несколькими годами и около 5,5 МБ памяти.
Если данные намного больше, сортировка будет ограничивающим фактором, поэтому вместо получения точного значения, вероятно, лучше попробовать, собрать и вычислить локально. Но если вы действительно хотите использовать Spark, что-то вроде этого должно сделать трюк (если я ничего не испортил):
из ползунка импорта numpy
время импорта
def quantile (rdd, p, sample = None, seed = None): "" Вычислить квантиль порядка p ∈ [0, 1] : rdd числовой rdd : p quantile (от 0 до 1) : примерная доля и rdd для использования. Если не указано, мы используем целый набор данных : семена генератора случайных чисел семян, которые будут использоваться с образцом "" Утверждение 0 < = p < = 1 утверждать, что образец равен None или 0 < образец < = 1
seed = seed, если семя не является None else time.time() rdd = rdd, если выборка отсутствует. else rdd.sample(False, sample, seed)
rddSortedWithIndex = (rdd. sortBy (lambda x: x). zipWithIndex(). map (lambda (x, i): (i, x)). Кэш())
n = rddSortedWithIndex.count() h = (n - 1) * p
rddX, rddXPlusOne = ( rddSortedWithIndex.lookup(х) [0] для x в int (floor (h)) + np.array([0L, 1L]))
return rddX + (h - floor (h)) * (rddXPlusOne - rddX)
Код>
И некоторые тесты:
np.median(rdd.collect()), quantile (rdd, 0.5)
## (500184,5, 500184,5)
np.percentile(rdd.collect(), 25), quantile (rdd, 0.25)
## (250506,75, 250506,75)
np.percentile(rdd.collect(), 75), quantile (rdd, 0.75)
(750069,25, 750069,25)
Код>
Наконец, определим медианную:
from functools import partial
медиана = частичная (квантиль, р = 0,5)
Код>
Пока это так хорошо, но требуется 4,66 с в локальном режиме без какой-либо сетевой связи. Вероятно, есть способ улучшить это, но зачем даже беспокоиться?
Независимый от языка (Hive UDAF):
Если вы используете HiveContext
, вы также можете использовать UDAF для Hive. Со встроенными значениями:
rdd.map(lambda x: (float (x),)). toDF ([ "x" ]). registerTempTable ( "df" )
sqlContext.sql( "SELECT percentile_approx (x, 0.5) FROM df" )
Код>
С непрерывными значениями:
sqlContext.sql( "SELECT percentile (x, 0.5) FROM df" )
Код>
В percentile_approx
вы можете передать дополнительный аргумент, который определяет количество используемых записей.
Ответ 2
Добавление решения, если вы хотите только метод RDD и не хотите переходить в DF.
Этот фрагмент может дать вам процентиль для двойной двойной копии.
Если вы вводите процентиль как 50, вы должны получить требуемую медиану.
Сообщите мне, есть ли какие-либо угловые случаи, не учтенные.
/**
* Gets the nth percentile entry for an RDD of doubles
*
* @param inputScore : Input scores consisting of a RDD of doubles
* @param percentile : The percentile cutoff required (between 0 to 100), e.g 90%ile of [1,4,5,9,19,23,44] = ~23.
* It prefers the higher value when the desired quantile lies between two data points
* @return : The number best representing the percentile in the Rdd of double
*/
def getRddPercentile(inputScore: RDD[Double], percentile: Double): Double = {
val numEntries = inputScore.count().toDouble
val retrievedEntry = (percentile * numEntries / 100.0 ).min(numEntries).max(0).toInt
inputScore
.sortBy { case (score) => score }
.zipWithIndex()
.filter { case (score, index) => index == retrievedEntry }
.map { case (score, index) => score }
.collect()(0)
}
Ответ 3
Вот метод, который я использовал с помощью оконных функций (с pyspark 2.2.0).
from pyspark.sql import DataFrame
class median():
""" Create median class with over method to pass partition """
def __init__(self, df, col, name):
assert col
self.column=col
self.df = df
self.name = name
def over(self, window):
from pyspark.sql.functions import percent_rank, pow, first
first_window = window.orderBy(self.column) # first, order by column we want to compute the median for
df = self.df.withColumn("percent_rank", percent_rank().over(first_window)) # add percent_rank column, percent_rank = 0.5 coressponds to median
second_window = window.orderBy(pow(df.percent_rank-0.5, 2)) # order by (percent_rank - 0.5)^2 ascending
return df.withColumn(self.name, first(self.column).over(second_window)) # the first row of the window corresponds to median
def addMedian(self, col, median_name):
""" Method to be added to spark native DataFrame class """
return median(self, col, median_name)
# Add method to DataFrame class
DataFrame.addMedian = addMedian
Затем вызовите метод addMedian для вычисления медианы col2:
from pyspark.sql import Window
median_window = Window.partitionBy("col1")
df = df.addMedian("col2", "median").over(median_window)
Наконец, вы можете группировать, если необходимо.
df.groupby("col1", "median")