Ответ 1
JDBC не требуется
Spark подключается напрямую к метасольве Hive, а не через HiveServer2. Чтобы настроить это,
-
Поместите
hive-site.xml
в вашhive-site.xml
кclasspath
и укажитеhive.metastore.uri
в том месте, где размещалось ваше хранилище метаданных улья. Также см. Как подключиться к метасольве Hive программно в SparkSQL? -
Импортируйте
org.apache.spark.sql.hive.HiveContext
, так как он может выполнять SQL-запросы к таблицам Hive. -
Определите
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
-
Проверьте
sqlContext.sql("show tables")
чтобы увидеть, работает ли он
Вывод: если вы должны пойти с JDBC путь
Посмотрите, как удаленно соединяется apache spark с apache hive.
Обратите внимание, что Билайн также подключается через jdbc. из вашего журнала это само по себе очевидно.
[ml @master spark-2.0.0] $./bin/beeline Beeline версия 1.2.1.spark2 от Apache Hive beeline>! connect jdbc: hive2://remote_hive: 10000
Подключение к jdbc: hive2://remote_hive: 10000
Поэтому, пожалуйста, взгляните на эту интересную статью.
- Способ 1: вытянуть таблицу в Spark с помощью JDBC
- Способ 2: использование Spark JdbcRDD с драйвером Jives HiveServer2
- Способ 3: выборка набора данных на стороне клиента, а затем создание RDD вручную
В настоящее время драйвер HiveServer2 не позволяет нам использовать "сверкающий" метод 1 и 2, мы можем полагаться только на метод 3
Ниже приведен пример фрагмента кода, который может быть достигнут
Загрузка данных из одного кластера Hadoop (он же "удаленный") в другой (где мой Spark живет как "домашний") через соединение HiveServer2 JDBC.
import java.sql.Timestamp
import scala.collection.mutable.MutableList
case class StatsRec (
first_name: String,
last_name: String,
action_dtm: Timestamp,
size: Long,
size_p: Long,
size_d: Long
)
val conn: Connection = DriverManager.getConnection(url, user, password)
val res: ResultSet = conn.createStatement
.executeQuery("SELECT * FROM stats_201512301914")
val fetchedRes = MutableList[StatsRec]()
while(res.next()) {
var rec = StatsRec(res.getString("first_name"),
res.getString("last_name"),
Timestamp.valueOf(res.getString("action_dtm")),
res.getLong("size"),
res.getLong("size_p"),
res.getLong("size_d"))
fetchedRes += rec
}
conn.close()
val rddStatsDelta = sc.parallelize(fetchedRes)
rddStatsDelta.cache()
// Basically we are done. To check loaded data:
println(rddStatsDelta.count)
rddStatsDelta.collect.take(10).foreach(println)