Карта не может быть сериализована в scala?

Я новичок в Scala. Почему функция "map" не является сериализуемой? Как сделать сериализуемым? Например, если мой код выглядит следующим образом:

val data = sc.parallelize(List(1,4,3,5,2,3,5))

def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
  val lst = List(("a", 1),("b", 2),("c",3), ("a",2))
  var res = List[Int]()
  while (iter.hasNext) {
    val cur = iter.next
    val a = lst.groupBy(x => x._1).mapValues(_.size)
    //val b= a.map(x => x._2)
    res = res ::: List(cur)
  }
  res.iterator
}

data.mapPartitions(myfunc).collect

Если я раскомментирую строку

val b= a.map(x => x._2)

Код возвращает исключение:

org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: scala.collection.immutable.MapLike$$anon$2
Serialization stack:
    - object not serializable (class: scala.collection.immutable.MapLike$$anon$2, value: Map(1 -> 3))
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: a, type: interface scala.collection.immutable.Map)

Большое спасибо.

Ответы

Ответ 1

Хорошо известная ошибка scala: https://issues.scala-lang.org/browse/SI-7005 Карта # mapValues ​​не является сериализуемой

У нас есть эта проблема в наших приложениях Spark, map(identity) решает проблему

rdd.groupBy(_.segment).mapValues(v => ...).map(identity)

Ответ 2

Фактическая реализация функции mapValues ​​приведена ниже, и, как вы видите, она не сериализуема и создает только представление, а не правильное существование данных, и, следовательно, вы получаете эту ошибку. Ситуация мудрая mapValues ​​имеет много преимуществ.

protected class MappedValues[C](f: B => C) extends AbstractMap[A, C] with DefaultMap[A, C] {
override def foreach[D](g: ((A, C)) => D): Unit = for ((k, v) <- self) g((k, f(v)))
def iterator = for ((k, v) <- self.iterator) yield (k, f(v))
override def size = self.size
override def contains(key: A) = self.contains(key)
def get(key: A) = self.get(key).map(f)
}

Ответ 3

Вы пытались запустить этот же код в приложении? Я подозреваю, что это проблема с искровой оболочкой. Если вы хотите заставить его работать в искровой оболочке, вы можете попробовать обернуть определение myfunc и его приложение в фигурные скобки следующим образом:

val data = sc.parallelize(List(1,4,3,5,2,3,5))

val result = { 
  def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
    val lst = List(("a", 1),("b", 2),("c",3), ("a",2))
    var res = List[Int]()
    while (iter.hasNext) {
      val cur = iter.next
      val a = lst.groupBy(x => x._1).mapValues(_.size)
      val b= a.map(x => x._2)
      res = res ::: List(cur)
    }
    res.iterator
  }
  data.mapPartitions(myfunc).collect
}