(редактировать) то есть какая разница (семантически или с точки зрения исполнения) между
Ответ 2
Настоятельный СОВЕТ:
Всякий раз, когда у вас есть тяжелая инициализация, которую следует выполнить один раз для многих элементов RDD
а не один раз для каждого элемента RDD
, и если эту инициализацию, такую как создание объектов из сторонней библиотеки, нельзя сериализовать (чтобы Spark мог передавать ее через кластера к рабочим узлам), используйте mapPartitions()
вместо map()
. mapPartitions()
инициализация выполняется один раз для каждой рабочей задачи/потока/раздела, а не один раз для каждого элемента данных RDD
например: см. ниже.
val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList // consumes the iterator, thus calls readMatchingFromDB
connection.close() // close dbconnection here
newPartition.iterator // create a new iterator
})
Q2. flatMap
ведет себя как карта или как mapPartitions
?
Да. пожалуйста, см. пример 2 flatmap
.. это само за себя.
Q1. В чем разница между map
RDD и mapPartitions
map
работает с функцией, используемой на уровне элемента, в то время как mapPartitions
выполняет функцию на уровне раздела.
Пример сценария: если у нас есть 100K элементов в определенном разделе RDD
то мы будем запускать функцию, используемую преобразованием отображения 100K раз, когда мы используем map
.
И наоборот, если мы используем mapPartitions
то мы будем вызывать определенную функцию только один раз, но мы передадим все записи по 100 КБ и получим все ответы за один вызов функции.
Это приведет к увеличению производительности, поскольку map
работает с определенной функцией очень много раз, особенно если функция делает что-то дорогостоящее каждый раз, когда в этом нет необходимости, если мы передаем все элементы одновременно (в случае mappartitions
).
карта
Применяет функцию преобразования к каждому элементу СДР и возвращает результат как новый СДР.
Листинг Варианты
карта отображения [U: ClassTag] (f: T => U): RDD [U]
Пример:
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
mapPartitions
Это специализированная карта, которая вызывается только один раз для каждого раздела. Все содержимое соответствующих разделов доступно в виде последовательного потока значений через входной аргумент (Iterarator [T]). Пользовательская функция должна возвращать еще один Iterator [U]. Объединенные итераторы результатов автоматически преобразуются в новый СДР. Обратите внимание, что кортежи (3,4) и (6,7) отсутствуют в следующем результате из-за выбранного нами разбиения.
preservesPartitioning
указывает, preservesPartitioning
ли функция ввода разделитель, что должно быть false
если это не пара RDD, и функция ввода не изменяет ключи.
Листинг Варианты
def mapPartitions [U: ClassTag] (f: Iterator [T] => Iterator [U], preservedPartitioning: Boolean = false): RDD [U]
Пример 1
val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res = List[(T, T)]()
var pre = iter.next
while (iter.hasNext)
{
val cur = iter.next;
res .::= (pre, cur)
pre = cur;
}
res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
Пример 2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
var res = List[Int]()
while (iter.hasNext) {
val cur = iter.next;
res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
}
res.iterator
}
x.mapPartitions(myfunc).collect
// some of the number are not outputted at all. This is because the random number generated for it is zero.
res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
Вышеуказанная программа также может быть написана с использованием flatMap следующим образом.
Пример 2 с использованием flatmap
val x = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
Заключение:
Преобразование mapPartitions
быстрее, чем map
поскольку оно вызывает вашу функцию один раз/раздел, а не один раз/элемент.
Дальнейшее чтение: foreach против foreachPartitions Когда использовать Что?