Apache Spark - foreach против foreachPartitions Когда использовать Что?
Я хотел бы знать, приведет ли foreachPartitions
к лучшей производительности из-за более высокого уровня parallelism по сравнению с методом foreach
, учитывая случай, когда я протекаю через RDD
чтобы выполнить некоторые суммы в переменной аккумулятора.
Ответы
Ответ 1
foreach
автоматический запуск цикла на многих узлах.
Однако иногда вы хотите выполнять некоторые операции над каждым node. Например, выполните подключение к базе данных. Вы не можете просто установить соединение и передать его в функцию foreach
: соединение выполняется только на одном node.
Итак, с помощью foreachPartition
вы можете установить соединение с базой данных на каждом node перед запуском цикла.
Ответ 2
foreach
и foreachPartitions
- это действия.
foreach (функция): единица измерения
Универсальная функция для вызова операций с побочными эффектами. Для каждого элемента в RDD он вызывает переданную функцию. Это обычно используется для манипуляции с аккумуляторами или записи во внешние магазины.
Примечание. Изменение переменных, отличных от Accumulators, вне функции foreach()
может привести к неопределенному поведению. См. Понимание замыканий для более подробной информации.
пример:
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
foreachPartition (функция): блок
Аналогично foreach()
, но вместо вызова функции для каждого элемента он вызывает ее для каждого раздела. Функция должна быть в состоянии принять итератор. Это более эффективно, чем foreach()
потому что оно уменьшает количество вызовов функций (так же, как и mapPartitions
()).
Использование foreachPartition
- Пример 1: для каждого раздела одно соединение с базой данных (внутри каждого блока раздела), которое вы хотите использовать, это пример использования того, как это можно сделать с помощью scala.
/**
* Insert in to database using foreach partition.
*
* @param sqlDatabaseConnectionString
* @param sqlTableName
*/
def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {
//numPartitions = number of simultaneous DB connections you can planning to give
datframe.repartition(numofpartitionsyouwant)
val tableHeader: String = dataFrame.columns.mkString(",")
dataFrame.foreachPartition { partition =>
// Note : Each partition one connection (more better way is to use connection pools)
val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)
//Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
partition.grouped(1000).foreach {
group =>
val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()
group.foreach {
record => insertString.append("('" + record.mkString(",") + "'),")
}
sqlExecutorConnection.createStatement()
.executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
+ insertString.stripSuffix(","))
}
sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
}
}
Использование foreachPartition
с sparkstreaming (dstreams) и производителем кафки
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// only once per partition You can safely share a thread-safe Kafka //producer instance.
val producer = createKafkaProducer()
partitionOfRecords.foreach { message =>
producer.send(message)
}
producer.close()
}
}
Примечание. Если вы хотите избежать такого способа создания производителя один раз для каждого раздела, лучше всего транслировать производителя с помощью sparkContext.broadcast
поскольку производитель Kafka является асинхронным и интенсивно буферизует данные перед отправкой.
Аккумуляторные образцы фрагментов, чтобы поиграть с ним... с помощью которого вы можете проверить производительность
test("Foreach - Spark") {
import spark.implicits._
var accum = sc.longAccumulator
sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x))
assert(accum.value == 6L)
}
test("Foreach partition - Spark") {
import spark.implicits._
var accum = sc.longAccumulator
sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_)))
assert(accum.value == 6L)
}
Заключение:
foreachPartition
операций над разделами, так что, очевидно, это будет лучше, чем foreach
Практическое правило:
foreachPartition
следует использовать, когда вы обращаетесь к дорогостоящим ресурсам, таким как соединения с базой данных или производитель kafka и т.д., которые инициализируют один на раздел, а не один на элемент (foreach
). Что касается аккумуляторов, вы можете измерить производительность с помощью вышеуказанных методов тестирования, которые должны работать быстрее и в случае аккумуляторов.
Также... см. Карту против картиров, которые имеют похожую концепцию, но они являются трансформациями.
Ответ 3
На самом деле разница между foreach
и foreachPartitions
невелика. Под обложками все, что делает foreach
, вызывает итератор foreach
с помощью предоставленной функции. foreachPartition
просто дает вам возможность сделать что-то вне цикла итератора, как правило, что-то дорогое, например, разворачивание соединения с базой данных или что-то в этом роде. Итак, если у вас нет ничего, что можно было бы сделать один раз для каждого итератора node и повторно использовать его, то я бы предложил использовать foreach
для большей ясности и уменьшения сложности.
Ответ 4
foreachPartition
не означает, что это за активность node, а выполняется для каждого раздела, и, возможно, у вас может быть большое количество разделов по сравнению с количеством узлов, в этом случае ваша производительность может быть снижена. Если вы намереваетесь выполнять активность на уровне node, то объяснение здесь может быть полезным, хотя оно не проверено мной
Ответ 5
foreachPartition
полезен только при повторном выполнении данных, которые вы агрегируете по разделам.
Хорошим примером является обработка кликов по каждому пользователю. Вы хотите очистить свой кэш вычислений каждый раз, когда вы закончите пользовательский поток событий, но держите его между записями одного и того же пользователя, чтобы рассчитать некоторые данные о поведении пользователя.