Стратифицированная выборка в Spark

У меня есть набор данных, который содержит данные о пользователе и покупке. Вот пример, где первый элемент - userId, второй - productId, а третий - логический.

(2147481832,23355149,1)
(2147481832,973010692,1)
(2147481832,2134870842,1)
(2147481832,541023347,1)
(2147481832,1682206630,1)
(2147481832,1138211459,1)
(2147481832,852202566,1)
(2147481832,201375938,1)
(2147481832,486538879,1)
(2147481832,919187908,1)
... 

Я хочу убедиться, что я использую только 80% данных каждого пользователя и создаю RDD, а оставлю 20% и создаю еще один RDD. Позволяет направить поезд и проверить. Я хотел бы избегать использования groupBy для начала, поскольку он может создать проблему с памятью, поскольку набор данных является большим. Каков наилучший способ сделать это?

Я мог бы сделать следующее, но это не даст 80% каждого пользователя.

val percentData = data.map(x => ((math.random * 100).toInt, x._1. x._2, x._3)
val train = percentData.filter(x => x._1 < 80).values.repartition(10).cache()

Ответы

Ответ 1

Одна из возможностей заключается в ответе Холдена, и это другое:

Вы можете использовать преобразование sampleByKeyExact из класса PairRDDFunctions.

sampleByKeyExact (boolean withReplacement, scala.collection.Map, длинное семя) Возвратите подмножество этого RDD, взятого с помощью ключа (через стратифицированную выборку), содержащую точно math.ceil(numItems * samplingRate) для каждой страты (группа пар с одним и тем же ключом).

И вот как я буду делать:

Учитывая следующий список:

val list = List((2147481832,23355149,1),(2147481832,973010692,1),(2147481832,2134870842,1),(2147481832,541023347,1),(2147481832,1682206630,1),(2147481832,1138211459,1),(2147481832,852202566,1),(2147481832,201375938,1),(2147481832,486538879,1),(2147481832,919187908,1),(214748183,919187908,1),(214748183,91187908,1))

Я бы создал пару RDD, отображая всех пользователей как клавиши:

val data = sc.parallelize(list.toSeq).map(x => (x._1,(x._2,x._3)))

Затем я настрою fractions для каждого ключа следующим образом, так как вы заметили, что sampleByKeyExact берет карту дроби для каждого ключа:

val fractions = data.map(_._1).distinct.map(x => (x,0.8)).collectAsMap

То, что я здесь сделал, это, на самом деле, сопоставление ключей для поиска отдельных, а затем связать каждый ключ с долей, равным 0,8, тогда я собираю все как карту.

Чтобы пробовать сейчас, все, что мне нужно сделать, это:

import org.apache.spark.rdd.PairRDDFunctions
val sampleData = data.sampleByKeyExact(false, fractions, 2L)

или

val sampleData = data.sampleByKeyExact(withReplacement = false, fractions = fractions,seed = 2L)

Вы можете проверить количество ваших ключей или данных или данных:

scala > data.count
// [...]
// res10: Long = 12

scala > sampleData.count
// [...]
// res11: Long = 10

EDIT: Я решил добавить часть для выполнения стратифицированной выборки на DataFrame s.

Итак, мы рассмотрим те же данные (list) из приведенного выше примера.

val df = list.toDF("keyColumn","value1","value2")
df.show
// +----------+----------+------+
// | keyColumn|    value1|value2|
// +----------+----------+------+
// |2147481832|  23355149|     1|
// |2147481832| 973010692|     1|
// |2147481832|2134870842|     1|
// |2147481832| 541023347|     1|
// |2147481832|1682206630|     1|
// |2147481832|1138211459|     1|
// |2147481832| 852202566|     1|
// |2147481832| 201375938|     1|
// |2147481832| 486538879|     1|
// |2147481832| 919187908|     1|
// | 214748183| 919187908|     1|
// | 214748183|  91187908|     1|
// +----------+----------+------+

Нам понадобится базовый RDD для этого, на котором мы создаем кортежи элементов в этом RDD, определяя наш ключ как первый столбец:

val data: RDD[(Int, Row)] = df.rdd.keyBy(_.getInt(0))
val fractions: Map[Int, Double] = data.map(_._1)
                                      .distinct
                                      .map(x => (x, 0.8))
                                      .collectAsMap

val sampleData: RDD[Row] = data.sampleByKeyExact(withReplacement = false, fractions, 2L)
                               .values

val sampleDataDF: DataFrame = spark.createDataFrame(sampleData, df.schema) // you can use sqlContext.createDataFrame(...) instead for spark 1.6)

Теперь вы можете проверить количество ваших ключей или df или образец данных:

scala > df.count
// [...]
// res9: Long = 12

scala > sampleDataDF.count
// [...]
// res10: Long = 10

EDIT 2: Начиная с Spark 1.5.0 вы можете использовать метод DataFrameStatFunctions.sampleBy:

df.stat.sampleBy("keyColumn", fractions, seed)

Ответ 2

Что-то вроде этого может хорошо подходить для чего-то вроде "Blink DB", но давайте рассмотрим вопрос. Существует два способа интерпретировать то, что вы просили:

1) Вы хотите 80% ваших пользователей, и вы хотите, чтобы все данные были для них. 2) Вы хотите 80% данных каждого пользователя.

Для # 1 вы можете сделать карту, чтобы получить идентификаторы пользователя, вызывать разные, а затем пробовать 80% из них (вы можете посмотреть kFold в MLUtils или BernoulliCellSampler). Затем вы можете отфильтровать свои входные данные только с набором идентификаторов, которые вы хотите.

Для # 2 вы можете посмотреть BernoulliCellSampler и просто применить его напрямую.