Является ли Spark KMeans неспособным обрабатывать bigdata?
KMeans имеет несколько параметров для обучение, при этом режим инициализации по умолчанию равен kmeans ||. Проблема заключается в том, что он марширует быстро (менее 10 минут) до первых 13 этапов, но затем полностью зависает, не давая ошибки!
Минимальный пример, который воспроизводит проблему (она будет успешной, если я использую 1000 точек или случайную инициализацию):
from pyspark.context import SparkContext
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.random import RandomRDDs
if __name__ == "__main__":
sc = SparkContext(appName='kmeansMinimalExample')
# same with 10000 points
data = RandomRDDs.uniformVectorRDD(sc, 10000000, 64)
C = KMeans.train(data, 8192, maxIterations=10)
sc.stop()
Задача ничего не делает (это не удается, не удается или не прогрессирует..), как показано ниже. На вкладке "Исполнители" нет активных/неудачных задач. В журналах Stdout и Stderr нет ничего особенно интересного:
![введите описание изображения здесь]()
Если я использую k=81
вместо 8192, это будет успешным:
![введите описание изображения здесь]()
Обратите внимание, что два вызова takeSample()
, не должны быть проблемой, поскольку в случайном случае инициализации дважды вызывались.
Итак, что происходит? Является ли Spark Kmeans неспособным масштабировать? Кто-нибудь знает? Можете ли вы воспроизвести?
Если это была проблема с памятью, Я бы получил предупреждения и ошибки, как и раньше.
Примечание: комментарии placeybordeaux основаны на выполнении задания в режиме клиента, где конфигурации драйвера недействительны, вызывая код выхода 143 и т.д. (см. историю изменений), а не в режиме кластера, где не сообщается об ошибке вообще-то, приложение просто зависает.
От нуля323: Почему алгоритм Spark Mllib KMeans очень медленный? связан, но я думаю, что он демонстрирует некоторый прогресс, в то время как моя зависает, я оставил комментарий...
![введите описание изображения здесь]()
Ответы
Ответ 1
Я думаю, что "висит" потому, что твои исполнители продолжают умирать. Как я упоминал в боковом разговоре, этот код работает отлично для меня, локально и на кластере, в Pyspark и Scala. Однако это занимает намного больше времени, чем нужно. Это почти все время, проведенное в k-средствах || инициализации.
Я открыл https://issues.apache.org/jira/browse/SPARK-17389, чтобы отслеживать два основных улучшения, один из которых вы можете использовать сейчас. Изменить: действительно, см. Также https://issues.apache.org/jira/browse/SPARK-11560
Во-первых, есть некоторые оптимизации кода, которые бы ускорили init примерно на 13%.
Однако большая часть проблемы заключается в том, что она по умолчанию равна 5 шагам k-означает || init, когда кажется, что 2 почти всегда так же хорош. Вы можете установить шаги инициализации на 2, чтобы увидеть ускорение, особенно на той стадии, которая висит сейчас.
В моем (меньшем) тесте на моем ноутбуке время инициализации перешло с 5:54 до 1:41 с обоими изменениями, в основном из-за установки шагов инициализации.
Ответ 2
Если ваш RDD настолько велик, collectAsMap попытается скопировать каждый отдельный элемент в RDD в одну программу драйвера, а затем закончится нехватка памяти и сбой. Несмотря на то, что вы разделили данные, collectAsMap отправляет все драйверу, а вы - сбои в работе.
Вы можете убедиться, что количество возвращаемых элементов ограничено вызовом take или takeSample или, возможно, фильтрацией или выборкой вашего RDD.
Аналогичным образом, будьте осторожны с этими другими действиями, если вы не уверены, что ваш размер набора данных достаточно мал, чтобы вписаться в память:
countByKey,
countByValue,
сбор
Если вам действительно нужно каждое из этих значений RDD, и данные слишком велики, чтобы вписаться в память, вы можете записать RDD в файлы или экспортировать RDD в базу данных, которая достаточно велика, чтобы удерживать все данные. Поскольку вы используете API, я думаю, что вы не можете этого сделать (перепишите весь код, возможно, увеличьте память?). Я думаю, что этот метод collectAsMap в методе runAlgorithm - это очень плохо в Kmeans (https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/dont_call_collect_on_a_very_large_rdd.html)...