Spark Streaming: foreachRDD обновляет мое монго RDD
Я хочу создать новый RDD mongodb каждый раз, когда я вхожу внутрь foreachRDD
. Однако у меня есть проблемы с сериализацией:
mydstream
.foreachRDD(rdd => {
val mongoClient = MongoClient("localhost", 27017)
val db = mongoClient(mongoDatabase)
val coll = db(mongoCollection)
// ssc is my StreamingContext
val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) })
Это даст мне ошибку:
object not serializable (class: org.apache.spark.streaming.StreamingContext, value: [email protected])
Любая идея?
Ответы
Ответ 1
Вы можете попытаться использовать rdd.context, который возвращает либо SparkContext, либо SparkStreamingContext (если rdd является DStream).
mydstream foreachRDD { rdd => {
val mongoClient = MongoClient("localhost", 27017)
val db = mongoClient(mongoDatabase)
val coll = db(mongoCollection)
val modelsRDDRaw = rdd.context.parallelize(coll.find().toList) })
На самом деле кажется, что RDD также имеет метод .sparkContext
. Я, честно говоря, не знаю разницы, может быть, это псевдонимы (?).
Ответ 2
В моем понимании вам нужно добавить, если у вас есть объект, который не является сериализуемым, вам нужно передать его через foreachPartition
, чтобы вы могли выполнить подключение к базе данных на каждом node перед запуском вашей обработки.
mydstream.foreachRDD(rdd => {
rdd.foreachPartition{
val mongoClient = MongoClient("localhost", 27017)
val db = mongoClient(mongoDatabase)
val coll = db(mongoCollection)
// ssc is my StreamingContext
val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }})