Spark - Генерация случайных чисел
Я написал метод, который должен учитывать случайное число, чтобы имитировать распределение Бернулли. Я использую random.nextDouble
для генерации числа от 0 до 1, а затем принимаю мое решение на основе этого значения, учитывая мой параметр вероятности.
Моя проблема в том, что Spark генерирует одни и те же случайные числа в каждой итерации моей функции отображения цикла. Я использую API DataFrame
. Мой код следует за этим форматом:
val myClass = new MyClass()
val M = 3
val myAppSeed = 91234
val rand = new scala.util.Random(myAppSeed)
for (m <- 1 to M) {
val newDF = sqlContext.createDataFrame(myDF
.map{row => RowFactory
.create(row.getString(0),
myClass.myMethod(row.getString(2), rand.nextDouble())
}, myDF.schema)
}
Вот класс:
class myClass extends Serializable {
val q = qProb
def myMethod(s: String, rand: Double) = {
if (rand <= q) // do something
else // do something else
}
}
Мне нужно новое случайное число каждый раз, когда вызывается myMethod
. Я также попытался создать число внутри моего метода с помощью java.util.Random
(scala.util.Random
v10 не расширяет Serializable
), как показано ниже, но я все равно получаю одинаковые числа в каждом цикле
val r = new java.util.Random(s.hashCode.toLong)
val rand = r.nextDouble()
Я провел некоторое исследование, и, похоже, это связано с детерминированной природой Спаркса.
Ответы
Ответ 1
Причина повторения одной и той же последовательности заключается в том, что случайный генератор создается и инициализируется семенем до разделения данных. Затем каждый раздел начинается с одного и того же случайного семени. Возможно, это не самый эффективный способ сделать это, но следующее должно работать:
val myClass = new MyClass()
val M = 3
for (m <- 1 to M) {
val newDF = sqlContext.createDataFrame(myDF
.map{
val rand = scala.util.Random
row => RowFactory
.create(row.getString(0),
myClass.myMethod(row.getString(2), rand.nextDouble())
}, myDF.schema)
}
Ответ 2
Просто используйте функцию SQL rand
:
import org.apache.spark.sql.functions._
//df: org.apache.spark.sql.DataFrame = [key: int]
df.select($"key", rand() as "rand").show
+---+-------------------+
|key| rand|
+---+-------------------+
| 1| 0.8635073400704648|
| 2| 0.6870153659986652|
| 3|0.18998048357873532|
+---+-------------------+
df.select($"key", rand() as "rand").show
+---+------------------+
|key| rand|
+---+------------------+
| 1|0.3422484248879837|
| 2|0.2301384925817671|
| 3|0.6959421970071372|
+---+------------------+
Ответ 3
Согласно этот пост, лучшим решением является не размещение new scala.util.Random
внутри карты, а также полностью вне (т.е. в код драйвера), но в промежуточном mapPartitionsWithIndex
:
import scala.util.Random
val myAppSeed = 91234
val newRDD = myRDD.mapPartitionsWithIndex { (indx, iter) =>
val rand = new scala.util.Random(indx+myAppSeed)
iter.map(x => (x, Array.fill(10)(rand.nextDouble)))
}
Ответ 4
Использование Spark Dataset API, возможно, для использования в аккумуляторе:
df.withColumn("_n", substring(rand(),3,4).cast("bigint"))