Запросы с потоковыми источниками должны выполняться с помощью writeStream.start();
Я пытаюсь прочитать сообщения от kafka (версия 10) в spark и пытаюсь распечатать его.
import spark.implicits._
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.config("spark.master", "local")
.getOrCreate()
val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topicA")
.load()
ds1.collect.foreach(println)
ds1.writeStream
.format("console")
.start()
ds1.printSchema()
получаю исключение ошибки в потоке "main"
org.apache.spark.sql.AnalysisException: запросы с потоковыми источниками должен выполняться с writeStream.start() ;;
Ответы
Ответ 1
Вы разветвляете план запроса: из того же ds1, который вы пытаетесь:
-
ds1.collect.foreach(...)
-
ds1.writeStream.format(...){...}
Но вы вызываете только .start()
во второй ветки, оставляя другую висячую без завершения, что, в свою очередь, вызывает исключение, которое вы получаете назад.
Решение состоит в том, чтобы запустить обе ветки и дождаться завершения.
val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topicA")
.load()
val query1 = ds1.collect.foreach(println)
.writeStream
.format("console")
.start()
val query2 = ds1.writeStream
.format("console")
.start()
ds1.printSchema()
query1.awaitTermination()
query2.awaitTermination()
Ответ 2
я исправил проблему, используя следующий код.
val df = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", "streamTest2")
.load();
val query = df.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
Ответ 3
Я много боролся с этим вопросом. Я попробовал каждое из предложенных решений из различных блогов. Но в моём случае между вызовом start() по запросу и несколькими запросами, наконец, я вызывал функцию awaitTerminate(), вызывающую это.
Пожалуйста, попробуйте таким образом, это прекрасно работает для меня. Рабочий пример:
val query = df.writeStream
.outputMode("append")
.format("console")
.start().awaitTermination();
Если вы напишите таким образом, это вызовет исключение/ошибку:
val query = df.writeStream
.outputMode("append")
.format("console")
.start()
// some statement
// some statement
query.awaitTermination();
выдаст данное исключение и закроет ваш драйвер потоковой передачи.
Ответ 4
при чтении сообщения об ошибке
org.apache.spark.sql.AnalysisException: запросы с потоковыми источниками должен быть выполнен с writeStream.start();;
я нашел эту статью , которая хорошо объясняет ее и дает другой подход. Я попробую сам и опубликую результаты позже, если это сработает для меня.
Ответ 5
Пожалуйста, удалите ds1.collect.foreach(println)
и ds1.printSchema()
, используйте outputMode
и awaitAnyTermination
для фонового процесса. Ожидание, пока не завершится какой-либо из запросов в связанном spark.streams
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.config("spark.master", "local[*]")
.getOrCreate()
val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topicA") .load()
val consoleOutput1 = ds1.writeStream
.outputMode("update")
.format("console")
.start()
spark.streams.awaitAnyTermination()
|key|value|topic|partition|offset|
+---+-----+-----+---------+------+
+---+-----+-----+---------+------+