Ответ 1
Наша цель - запустить параллельные SQL-запросы от рабочих Spark.
Настройка сборки
Добавьте соединитель и JDBC в libraryDependencies
в build.sbt
. Я только пробовал это с MySQL, поэтому я буду использовать это в своих примерах, но Postgres должен быть одинаков.
libraryDependencies ++= Seq(
jdbc,
"mysql" % "mysql-connector-java" % "5.1.29",
"org.apache.spark" %% "spark-core" % "1.0.1",
// etc
)
код
Когда вы создаете SparkContext
, вы указываете, какие банки копировать исполнителям. Включите соединительную банку. Хороший способ сделать это:
val classes = Seq(
getClass, // To get the jar with our own code.
classOf[mysql.jdbc.Driver] // To get the connector.
)
val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
val conf = new SparkConf().setJars(jars)
Теперь Spark готов к подключению к базе данных. Каждый исполнитель выполнит часть запроса, чтобы результаты были готовы к распределенным вычислениям.
Для этого есть два варианта. Более старый подход заключается в использовании org.apache.spark.rdd.JdbcRDD
:
val rdd = new org.apache.spark.rdd.JdbcRDD(
sc,
() => {
sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
},
"SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",
0, 1000, 10,
row => row.getString("BOOK_TITLE")
)
Проверьте документацию по параметрам. Вкратце:
- У вас есть
SparkContext
. - Затем создается функция, которая создает соединение. Это будет вызвано для каждого рабочего для подключения к базе данных.
- Затем запрос SQL. Это должно быть похоже на пример и содержать заполнители для стартового и конечного ключей.
- Затем вы указываете диапазон ключей (от 0 до 1000 в моем примере) и количество разделов. Диапазон будет разделен между разделами. Таким образом, один пример исполнителя завершит выполнение
SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100
в этом примере. - И, наконец, у нас есть функция, которая преобразует
ResultSet
во что-то. В этом примере мы преобразуем его вString
, поэтому вы получитеRDD[String]
.
Так как Apache Spark версии 1.3.0 другой метод доступен через API DataFrame. Вместо JdbcRDD
вы создадите org.apache.spark.sql.DataFrame
:
val df = sqlContext.load("jdbc", Map(
"url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred",
"dbtable" -> "BOOKS"))
См. https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases для полного списка опций (диапазон клавиш и количество разделов можно установить так же, как с помощью JdbcRDD
).
Обновление
JdbcRDD
не поддерживает обновления. Но вы можете просто сделать их в foreachPartition
.
rdd.foreachPartition { it =>
val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?")
for (bookTitle <- it) {
del.setString(1, bookTitle)
del.executeUpdate
}
}
(Это создает одно соединение для каждого раздела. Если это проблема, используйте пул соединений!)
DataFrame
поддерживает обновления с помощью методов createJDBCTable
и insertIntoJDBC
.