Spark дает StackOverflowError при обучении с использованием ALS

При попытке обучения модели машинного обучения с использованием ALS в Spark MLLib я продолжал получать StackoverflowError. Вот небольшой пример трассировки стека:

Traceback (most recent call last):
  File "/Users/user/Spark/imf.py", line 31, in <module>
    model = ALS.train(rdd, rank, numIterations)
  File "/usr/local/Cellar/apache-spark/1.3.1_1/libexec/python/pyspark/mllib/recommendation.py", line 140, in train
    lambda_, blocks, nonnegative, seed)
  File "/usr/local/Cellar/apache-spark/1.3.1_1/libexec/python/pyspark/mllib/common.py", line 120, in callMLlibFunc
    return callJavaFunc(sc, api, *args)
  File "/usr/local/Cellar/apache-spark/1.3.1_1/libexec/python/pyspark/mllib/common.py", line 113, in callJavaFunc
    return _java2py(sc, func(*args))
  File "/usr/local/Cellar/apache-spark/1.3.1_1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/usr/local/Cellar/apache-spark/1.3.1_1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o35.trainALSModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 40.0 failed 1 times, most recent failure: Lost task 0.0 in stage 40.0 (TID 35, localhost): java.lang.StackOverflowError
        at java.io.ObjectInputStream$PeekInputStream.peek(ObjectInputStream.java:2296)
        at java.io.ObjectInputStream$BlockDataInputStream.peek(ObjectInputStream.java:2589)

Эта ошибка также появляется при попытке запустить .mean() для вычисления ошибки среднего квадрата. Он появился как в версии 1.3.1_1, так и в версии 1.4.1 от Spark. Я использовал PySpark, и увеличение доступной памяти не помогло.

Ответы

Ответ 1

Решение заключалось в том, чтобы добавить контрольную точку, которая не позволяет рекурсии, используемой кодовой базой, создавать переполнение. Сначала создайте новый каталог для хранения контрольных точек. Затем используйте SparkContext этот каталог для контрольной проверки. Вот пример в Python:

sc.setCheckpointDir('checkpoint/')

Вам также может потребоваться добавить контрольную точку в ALS, но я не смог определить, имеет ли это значение. Чтобы добавить контрольную точку (возможно, не нужно), просто выполните:

ALS.checkpointInterval = 2