Соединяет ли параллель в скважине Spark?

Я запускаю довольно небольшую программу Spark с несколькими операциями map и reduceByKey по очень маленькому набору данных менее 400 МБ.

В какой-то момент у меня есть RDD кортежей, которые я хочу сортировать, и я называю sortByKey. Это самая медленная часть моей программы. Все остальное, кажется, работает почти мгновенно, но это занимает до 20 секунд.

Проблема в том, что она занимает 20 секунд на моем ноутбуке, а также в кластере машин AWS m3.large. Я пробовал с 1, 2 и 3 подчиненными, и различия во времени выполнения очень малы. Ganglia и искровой веб-консоль показывают, что CPU и память используются для максимальной емкости во всех ведомых устройствах, поэтому я думаю, что конфигурация в порядке.

Я также нашел проблему выполнения, прежде чем я ожидал, но затем прочитал этот поток, что указывает на открытую проблему в Spark. Я не думаю, что это было полностью связано.

Является ли sortByKey неотвратимо медленным, и неважно, сколько узлов я добавляю, он будет определять минимальное время выполнения моей программы? Надеюсь, нет, и есть что-то, что я делаю неправильно и могу быть исправлено.

ИЗМЕНИТЬ

Оказывается, что то, что я видел, было связано с той ссылкой, которую я опубликовал. sortByKey просто оказался первым действием (задокументированным как преобразование), и казалось, что программа медленная при сортировке, но на самом деле сортировка выполняется довольно быстро. Проблема заключается в предыдущей операции join.

Все, что я сказал, применяется, изменяя сортировку с соединением. Почему время выполнения не уменьшается, когда я добавляю больше узлов (или numTask к функции соединения), и почему это не лучше, чем простое соединение SQL? Я обнаружил кто-то еще, столкнувшийся с этой проблемой, но ответа нет, кроме предложения сериализации настройки, что я действительно не думаю, что это мой случай.

Ответы

Ответ 1

Соединение по сути является тяжелой операцией, потому что значения с одинаковыми ключами должны быть перемещены на один и тот же компьютер (сетевой перетасовки). Добавление большего количества узлов просто добавит дополнительные накладные расходы ввода-вывода.

Я могу думать о двух вещах:

Вариант 1

Если вы присоединяетесь к большому набору данных с меньшим, он может заплатить за передачу меньшего набора данных:

val large = sc.textFile("large.txt").map(...) 
val smaller = sc.textFile("smaller.txt").collect().toMap() 
val bc = sc.broadcast(smaller)

И затем выполните "ручное объединение":

large.map(x => (x.value, bc.value(x.value)))

Это описано более подробно в этой расширенной презентации Spark.

Вариант 2

Вы можете переделать небольшой набор данных, используя тот же самый разделитель, что и большой (т.е. убедитесь, что аналогичные ключи находятся на одном компьютере). Таким образом, настройте разбиение небольшого набора на соответствие разбиению большого.

Это приведет к тасованию только небольшого набора. После правильной разбивки соединение должно быть довольно быстрым, так как оно будет выполняться локально в каждом кластере node.