Spark DataFrames UPSERT для таблицы Postgres
Я использую Apache Spark DataFrames для объединения двух источников данных и получения результата в качестве другого DataFrame. Я хочу записать результат в другую таблицу Postgres. Я вижу эту опцию:
myDataFrame.write.jdbc(url, table, connectionProperties)
Но то, что я хочу сделать, это UPSERT dataframe в таблицу на основе основного ключа таблицы. Как это сделать? Я использую Spark 1.6.0.
Ответы
Ответ 1
Не поддерживается. DataFrameWriter
может либо добавлять, либо перезаписывать существующую таблицу. Если ваше приложение требует более сложной логики, вам придется иметь дело с этим вручную.
Один из вариантов - использовать действие (foreach
, foreachPartition
) со стандартным соединением JDBC. Еще один - записать во временное и обработать остальное непосредственно в базе данных.
Ответ 2
KrisP имеет на это право. Лучший способ сделать упор - не через подготовленное выражение. Важно отметить, что этот метод будет вставлять по одному с таким количеством разделов, сколько у вас есть работников. Если вы хотите сделать это в пакетном режиме, вы можете также
import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
val dbc: Connection = DriverManager.getConnection("JDBCURL")
val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")
batch.grouped("# Of Rows you want per batch").foreach { session =>
session.foreach { x =>
st.setDouble(1, x.getDouble(1))
st.addBatch()
}
st.executeBatch()
}
dbc.close()
}
Это выполнит пакеты для каждого работника и закроет соединение с БД. Это дает вам контроль над тем, сколько рабочих, сколько партий и позволяет вам работать в этих пределах.
Ответ 3
Если вы собираетесь делать это вручную и с помощью опции 1, упомянутой zero323, вы должны взглянуть на исходный код Spark для инструкции insert
def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
val columns = rddSchema.fields.map(_.name).mkString(",")
val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
conn.prepareStatement(sql)
}
PreparedStatement
часть java.sql
и имеет методы, такие как execute()
и executeUpdate()
. Разумеется, вам все равно придется изменить sql
.
Ответ 4
Чтобы вставить JDBC, вы можете использовать
dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)
Кроме того, Dataframe.write предоставляет вам DataFrameWriter и имеет некоторые методы для вставки блока данных.
def insertInto(tableName: String): Unit
Вставляет содержимое DataFrame в указанную таблицу. Это требует, чтобы схема DataFrame была такой же, как схема таблицы.
Поскольку он вставляет данные в существующую таблицу, формат или параметры будут игнорироваться.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
Пока ничего не обновить отдельные записи из коробки из-за искры, хотя