Ответ 1
Это известная проблема с Spark REPL. Вы можете найти более подробную информацию в SPARK-2620. Он влияет на несколько операций в Spark REPL, включая большинство преобразований на PairwiseRDDs
. Например:
case class Foo(x: Int)
val foos = Seq(Foo(1), Foo(1), Foo(2), Foo(2))
foos.distinct.size
// Int = 2
val foosRdd = sc.parallelize(foos, 4)
foosRdd.distinct.count
// Long = 4
foosRdd.map((_, 1)).reduceByKey(_ + _).collect
// Array[(Foo, Int)] = Array((Foo(1),1), (Foo(1),1), (Foo(2),1), (Foo(2),1))
foosRdd.first == foos.head
// Boolean = false
Foo.unapply(foosRdd.first) == Foo.unapply(foos.head)
// Boolean = true
Что еще хуже, так это то, что результаты зависят от распределения данных:
sc.parallelize(foos, 1).distinct.count
// Long = 2
sc.parallelize(foos, 1).map((_, 1)).reduceByKey(_ + _).collect
// Array[(Foo, Int)] = Array((Foo(2),2), (Foo(1),2))
Самое простое, что вы можете сделать, это определить и упаковать классы классов вне REPL. Любой код, представленный непосредственно с помощью spark-submit
, должен работать.
В Scala 2.11+ вы можете создать пакет непосредственно в REPL с помощью paste -raw
.
scala> :paste -raw
// Entering paste mode (ctrl-D to finish)
package bar
case class Bar(x: Int)
// Exiting paste mode, now interpreting.
scala> import bar.Bar
import bar.Bar
scala> sc.parallelize(Seq(Bar(1), Bar(1), Bar(2), Bar(2))).distinct.collect
res1: Array[bar.Bar] = Array(Bar(1), Bar(2))