Spark Streaming: foreachRDD обновляет мое монго RDD

Я хочу создать новый RDD mongodb каждый раз, когда я вхожу внутрь foreachRDD. Однако у меня есть проблемы с сериализацией:

 mydstream  
   .foreachRDD(rdd => {
      val mongoClient = MongoClient("localhost", 27017)
      val db = mongoClient(mongoDatabase)
      val coll = db(mongoCollection)
      // ssc is my StreamingContext
      val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) })

Это даст мне ошибку:

object not serializable (class: org.apache.spark.streaming.StreamingContext, value: [email protected])

Любая идея?

Ответы

Ответ 1

Вы можете попытаться использовать rdd.context, который возвращает либо SparkContext, либо SparkStreamingContext (если rdd является DStream).

mydstream foreachRDD { rdd => {
      val mongoClient = MongoClient("localhost", 27017)
      val db = mongoClient(mongoDatabase)
      val coll = db(mongoCollection)
      val modelsRDDRaw = rdd.context.parallelize(coll.find().toList) })

На самом деле кажется, что RDD также имеет метод .sparkContext. Я, честно говоря, не знаю разницы, может быть, это псевдонимы (?).

Ответ 2

В моем понимании вам нужно добавить, если у вас есть объект, который не является сериализуемым, вам нужно передать его через foreachPartition, чтобы вы могли выполнить подключение к базе данных на каждом node перед запуском вашей обработки.

mydstream.foreachRDD(rdd => {
        rdd.foreachPartition{
          val mongoClient = MongoClient("localhost", 27017)
          val db = mongoClient(mongoDatabase)
          val coll = db(mongoCollection)
          // ssc is my StreamingContext
          val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }})