SQL по Spark Streaming
Это код для запуска простых SQL-запросов по Spark Streaming.
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.Duration
object StreamingSQL {
case class Persons(name: String, age: Int)
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount")
val sc = new SparkContext(sparkConf)
// Create the context
val ssc = new StreamingContext(sc, Seconds(2))
val lines = ssc.textFileStream("C:/Users/pravesh.jain/Desktop/people/")
lines.foreachRDD(rdd=>rdd.foreach(println))
val sqc = new SQLContext(sc);
import sqc.createSchemaRDD
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
lines.foreachRDD(rdd=>{
rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).registerAsTable("data")
val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19")
teenagers.foreach(println)
})
ssc.start()
ssc.awaitTermination()
}
}
Как вы можете видеть, для запуска SQL поверх потоковой передачи запрос должен выполняться внутри метода foreachRDD.
Я хочу запустить SQL-соединение над данными, полученными из двух разных потоков. Есть ли способ сделать это?
Ответы
Ответ 1
Хорошо, я хотел бы подытожить обходной путь, по которому мы пришли после обсуждения в ответ Spiro. Его предложение сначала создать пустой стол, а затем вставить в него RDD. Единственная проблема заключается в том, что Spark не позволяет вставлять в таблицы еще. Вот что можно сделать:
Сначала создайте RDD с той же схемой, что и ожидаемая из вашего потока:
import sqlContext.createSchemaRDD
val d1=sc.parallelize(Array(("a",10),("b",3))).map(e=>Rec(e._1,e._2))
Затем сохраните его как файл Паркет
d1.saveAsParquetFile("/home/p1.parquet")
Теперь загрузите файл паркета и зарегистрируйте его как таблицу с помощью метода registerAsTable().
val parquetFile = sqlContext.parquetFile("/home/p1.parquet")
parquetFile.registerAsTable("data")
Теперь, когда вы получаете свой поток, просто примените foreachRDD() в своем потоке и продолжайте вставлять отдельные RDD в таблицу, созданную выше, используя insertInto() метод
dStream.foreachRDD(rdd=>{
rdd.insertInto("data")
})
Эта функция insertInto() отлично работает и позволяет собирать данные в таблицу. Теперь вы можете сделать то же самое для любого количества потоков, а затем запустить свои запросы.
Ответ 2
Как вы написали свой код, в итоге вы создаете последовательность небольших SchemaRDD при каждом запуске SQL-запроса. Хитрость заключается в том, чтобы сохранить каждый из них либо в накопительном RDD, либо в таблице накопления.
Сначала, подход таблицы, используя insertInto
:
Для каждого из ваших потоков сначала создайте резервную копию emty, которую вы регистрируете в виде таблицы, и получите пустую таблицу. Для вашего примера позвольте сказать, что вы называете это "allTeenagers".
Затем для каждого из ваших запросов используйте метод SchemaRDD insertInto
, чтобы добавить результат в эту таблицу:
teenagers.insertInto("allTeenagers")
Если вы сделаете это с обоими потоками, создав две отдельные таблицы накопления, вы можете присоединиться к ним, используя простой старый SQL-запрос.
(Примечание: на самом деле я не смог заставить его работать, и небольшой поиск заставляет меня сомневаться в том, что у кого-то еще есть, но я уверен, что понял смысл дизайна insertInto
, поэтому Я думаю, что это решение стоит записать.)
Второй, подход unionAll
(также есть метод union
, но это делает сложнее получить правильные типы):
Это связано с созданием исходного RDD - снова позвоните ему allTeenagers
.
// create initial SchemaRDD even if it empty, so the types work out right
var allTeenagers = sqc.sql("SELECT ...")
Затем каждый раз:
val teenagers = sqc.sql("SELECT ...")
allTeenagers = allTeenagers.unionAll(teenagers)
Возможно, нет необходимости говорить, что вам нужны столбцы, чтобы соответствовать.