Ответ 1
Одна из возможностей заключается в ответе Холдена, и это другое:
Вы можете использовать преобразование sampleByKeyExact из класса PairRDDFunctions.
sampleByKeyExact (boolean withReplacement, scala.collection.Map, длинное семя) Возвратите подмножество этого RDD, взятого с помощью ключа (через стратифицированную выборку), содержащую точно math.ceil(numItems * samplingRate) для каждой страты (группа пар с одним и тем же ключом).
И вот как я буду делать:
Учитывая следующий список:
val list = List((2147481832,23355149,1),(2147481832,973010692,1),(2147481832,2134870842,1),(2147481832,541023347,1),(2147481832,1682206630,1),(2147481832,1138211459,1),(2147481832,852202566,1),(2147481832,201375938,1),(2147481832,486538879,1),(2147481832,919187908,1),(214748183,919187908,1),(214748183,91187908,1))
Я бы создал пару RDD, отображая всех пользователей как клавиши:
val data = sc.parallelize(list.toSeq).map(x => (x._1,(x._2,x._3)))
Затем я настрою fractions
для каждого ключа следующим образом, так как вы заметили, что sampleByKeyExact берет карту дроби для каждого ключа:
val fractions = data.map(_._1).distinct.map(x => (x,0.8)).collectAsMap
То, что я здесь сделал, это, на самом деле, сопоставление ключей для поиска отдельных, а затем связать каждый ключ с долей, равным 0,8, тогда я собираю все как карту.
Чтобы пробовать сейчас, все, что мне нужно сделать, это:
import org.apache.spark.rdd.PairRDDFunctions
val sampleData = data.sampleByKeyExact(false, fractions, 2L)
или
val sampleData = data.sampleByKeyExact(withReplacement = false, fractions = fractions,seed = 2L)
Вы можете проверить количество ваших ключей или данных или данных:
scala > data.count
// [...]
// res10: Long = 12
scala > sampleData.count
// [...]
// res11: Long = 10
EDIT: Я решил добавить часть для выполнения стратифицированной выборки на DataFrame
s.
Итак, мы рассмотрим те же данные (list
) из приведенного выше примера.
val df = list.toDF("keyColumn","value1","value2")
df.show
// +----------+----------+------+
// | keyColumn| value1|value2|
// +----------+----------+------+
// |2147481832| 23355149| 1|
// |2147481832| 973010692| 1|
// |2147481832|2134870842| 1|
// |2147481832| 541023347| 1|
// |2147481832|1682206630| 1|
// |2147481832|1138211459| 1|
// |2147481832| 852202566| 1|
// |2147481832| 201375938| 1|
// |2147481832| 486538879| 1|
// |2147481832| 919187908| 1|
// | 214748183| 919187908| 1|
// | 214748183| 91187908| 1|
// +----------+----------+------+
Нам понадобится базовый RDD для этого, на котором мы создаем кортежи элементов в этом RDD, определяя наш ключ как первый столбец:
val data: RDD[(Int, Row)] = df.rdd.keyBy(_.getInt(0))
val fractions: Map[Int, Double] = data.map(_._1)
.distinct
.map(x => (x, 0.8))
.collectAsMap
val sampleData: RDD[Row] = data.sampleByKeyExact(withReplacement = false, fractions, 2L)
.values
val sampleDataDF: DataFrame = spark.createDataFrame(sampleData, df.schema) // you can use sqlContext.createDataFrame(...) instead for spark 1.6)
Теперь вы можете проверить количество ваших ключей или df
или образец данных:
scala > df.count
// [...]
// res9: Long = 12
scala > sampleDataDF.count
// [...]
// res10: Long = 10
EDIT 2: Начиная с Spark 1.5.0 вы можете использовать метод DataFrameStatFunctions.sampleBy:
df.stat.sampleBy("keyColumn", fractions, seed)