Ответ 1
Резюме
Вообще говоря, итеративные алгоритмы, особенно с самосоединением или самосоединением, требуют контроля над:
- Длина линии (см., например, Stackoverflow из-за длительной RDD Lineage и unionAll, что приводит к StackOverflow).
- Количество разделов.
Проблема, описанная здесь, является результатом отсутствия первой. В каждом итерационном номере раздела увеличивается с самосоединением, ведущим к экспоненциальному шаблону. Чтобы решить, что вам нужно либо контролировать количество разделов на каждой итерации (см. Ниже), либо использовать глобальные инструменты, такие как spark.default.parallelism
(см. ответ, предоставленный Travis). В общем, первый подход обеспечивает гораздо больший контроль в целом и не влияет на другие части кода.
Оригинальный ответ:
Насколько я могу судить, здесь есть две проблемы с чередованием - увеличение количества разделов и перетасовка служебных данных во время соединений. Оба могут быть легко обработаны, поэтому давайте шаг за шагом.
Сначала создайте помощника для сбора статистики:
import datetime
def get_stats(i, init, init2, init3, init4,
start, end, desc, cache, part, hashp):
return {
"i": i,
"init": init.getNumPartitions(),
"init1": init2.getNumPartitions(),
"init2": init3.getNumPartitions(),
"init4": init4.getNumPartitions(),
"time": str(end - start),
"timen": (end - start).seconds + (end - start).microseconds * 10 **-6,
"desc": desc,
"cache": cache,
"part": part,
"hashp": hashp
}
другой помощник для обработки кеширования/разбиения
def procRDD(rdd, cache=True, part=False, hashp=False, npart=16):
rdd = rdd if not part else rdd.repartition(npart)
rdd = rdd if not hashp else rdd.partitionBy(npart)
return rdd if not cache else rdd.cache()
извлечь логику конвейера:
def run(init, description, cache=True, part=False, hashp=False,
npart=16, n=6):
times = []
for i in range(n):
start = datetime.datetime.now()
init2 = procRDD(
init.map(lambda n: (n, n*3)),
cache, part, hashp, npart)
init3 = procRDD(
init.map(lambda n: (n, n*2)),
cache, part, hashp, npart)
# If part set to True limit number of the output partitions
init4 = init2.join(init3, npart) if part else init2.join(init3)
init = init4.map(lambda n: n[0])
if cache:
init4.cache()
init.cache()
init.count() # Force computations to get time
end = datetime.datetime.now()
times.append(get_stats(
i, init, init2, init3, init4,
start, end, description,
cache, part, hashp
))
return times
и создайте исходные данные:
ncores = 8
init = sc.parallelize(xrange(10000), ncores * 2).cache()
Объедините операцию самостоятельно, если аргумент numPartitions
не указан, отрегулируйте количество разделов на выходе в зависимости от количества разделов входных RDD. Это означает увеличение количества разделов с каждой итерацией. Если количество разделов на большие вещи становится уродливым. Вы можете справиться с ними, предоставив аргумент numPartitions
для объединения или разделения RDD с каждой итерацией.
timesCachePart = sqlContext.createDataFrame(
run(init, "cache + partition", True, True, False, ncores * 2))
timesCachePart.select("i", "init1", "init2", "init4", "time", "desc").show()
+-+-----+-----+-----+--------------+-----------------+
|i|init1|init2|init4| time| desc|
+-+-----+-----+-----+--------------+-----------------+
|0| 16| 16| 16|0:00:01.145625|cache + partition|
|1| 16| 16| 16|0:00:01.090468|cache + partition|
|2| 16| 16| 16|0:00:01.059316|cache + partition|
|3| 16| 16| 16|0:00:01.029544|cache + partition|
|4| 16| 16| 16|0:00:01.033493|cache + partition|
|5| 16| 16| 16|0:00:01.007598|cache + partition|
+-+-----+-----+-----+--------------+-----------------+
Как вы можете видеть, когда время выполнения перераспределения более или менее постоянное. Вторая проблема заключается в том, что выше данные разбиваются случайным образом. Чтобы обеспечить производительность соединения, мы хотели бы иметь одинаковые ключи для одного раздела. Для этого мы можем использовать хэш-разделитель:
timesCacheHashPart = sqlContext.createDataFrame(
run(init, "cache + hashpart", True, True, True, ncores * 2))
timesCacheHashPart.select("i", "init1", "init2", "init4", "time", "desc").show()
+-+-----+-----+-----+--------------+----------------+
|i|init1|init2|init4| time| desc|
+-+-----+-----+-----+--------------+----------------+
|0| 16| 16| 16|0:00:00.946379|cache + hashpart|
|1| 16| 16| 16|0:00:00.966519|cache + hashpart|
|2| 16| 16| 16|0:00:00.945501|cache + hashpart|
|3| 16| 16| 16|0:00:00.986777|cache + hashpart|
|4| 16| 16| 16|0:00:00.960989|cache + hashpart|
|5| 16| 16| 16|0:00:01.026648|cache + hashpart|
+-+-----+-----+-----+--------------+----------------+
Время выполнения является постоянным, как и раньше, и небольшое улучшение по сравнению с основным разделением.
Теперь вы можете использовать кеш только как ссылку:
timesCacheOnly = sqlContext.createDataFrame(
run(init, "cache-only", True, False, False, ncores * 2))
timesCacheOnly.select("i", "init1", "init2", "init4", "time", "desc").show()
+-+-----+-----+-----+--------------+----------+
|i|init1|init2|init4| time| desc|
+-+-----+-----+-----+--------------+----------+
|0| 16| 16| 32|0:00:00.992865|cache-only|
|1| 32| 32| 64|0:00:01.766940|cache-only|
|2| 64| 64| 128|0:00:03.675924|cache-only|
|3| 128| 128| 256|0:00:06.477492|cache-only|
|4| 256| 256| 512|0:00:11.929242|cache-only|
|5| 512| 512| 1024|0:00:23.284508|cache-only|
+-+-----+-----+-----+--------------+----------+
Как вы можете видеть количество разделов (init2, init3, init4) для версии с кешем удваивается с каждой итерацией, а время выполнения пропорционально количеству разделов.
Наконец, мы можем проверить, можем ли мы улучшить производительность при большом количестве разделов, если мы используем хэш-разделитель:
timesCacheHashPart512 = sqlContext.createDataFrame(
run(init, "cache + hashpart 512", True, True, True, 512))
timesCacheHashPart512.select(
"i", "init1", "init2", "init4", "time", "desc").show()
+-+-----+-----+-----+--------------+--------------------+
|i|init1|init2|init4| time| desc|
+-+-----+-----+-----+--------------+--------------------+
|0| 512| 512| 512|0:00:14.492690|cache + hashpart 512|
|1| 512| 512| 512|0:00:20.215408|cache + hashpart 512|
|2| 512| 512| 512|0:00:20.408070|cache + hashpart 512|
|3| 512| 512| 512|0:00:20.390267|cache + hashpart 512|
|4| 512| 512| 512|0:00:20.362354|cache + hashpart 512|
|5| 512| 512| 512|0:00:19.878525|cache + hashpart 512|
+-+-----+-----+-----+--------------+--------------------+
Улучшение не так впечатляет, но если у вас небольшой кластер и много данных, все равно стоит попробовать.
Я думаю, что убрать сообщение здесь - это вопрос разбиения. Существуют контексты, где он обрабатывается для вас (mllib
, sql
), но если вы используете операции низкого уровня, это ваша ответственность.