Время икры иглы увеличивается экспоненциально при использовании соединения

Я новичок в Spark, и я пытаюсь реализовать некоторый итеративный алгоритм кластеризации (максимизация ожиданий) с помощью центроида, представленного моделью Маркова. Поэтому мне нужно делать итерации и присоединяться.

Одна из проблем, которые я испытываю, заключается в том, что каждый рост времени итераций экспоненциально. После некоторых экспериментов я обнаружил, что при выполнении итераций ему нужно было сохранить RDD, который будет использоваться повторно на следующей итерации, иначе каждая итерационная искра создаст план выполнения, который будет пересчитывать RDD с самого начала, тем самым увеличивая время вычисления.

init = sc.parallelize(xrange(10000000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))        
    init = init2.map(lambda n: n[0])
#     init.cache()

    print init.count()    
    print str(datetime.datetime.now() - start)

Результаты в:

0
10000000
0:00:04.283652
1
10000000
0:00:05.998830
2
10000000
0:00:08.771984
3
10000000
0:00:11.399581
4
10000000
0:00:14.206069
5
10000000
0:00:16.856993

Таким образом, добавление кеша() помогает и время итерации становиться постоянным.

init = sc.parallelize(xrange(10000000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))        
    init = init2.map(lambda n: n[0])
    init.cache()

    print init.count()    
    print str(datetime.datetime.now() - start)
0
10000000
0:00:04.966835
1
10000000
0:00:04.609885
2
10000000
0:00:04.324358
3
10000000
0:00:04.248709
4
10000000
0:00:04.218724
5
10000000
0:00:04.223368

Но когда вы входите в итерацию, проблема возвращается. Вот простой код, демонстрирующий проблему. Даже создание кеша при каждой трансформации RDD не решает проблему:

init = sc.parallelize(xrange(10000), 3)
init.cache()

for i in range(6):
    print i
    start = datetime.datetime.now()

    init2 = init.map(lambda n: (n, n*3))
    init2.cache()

    init3 = init.map(lambda n: (n, n*2))
    init3.cache()

    init4 = init2.join(init3)
    init4.count()
    init4.cache()

    init = init4.map(lambda n: n[0])
    init.cache()

    print init.count()    
    print str(datetime.datetime.now() - start)

И вот результат. Поскольку вы можете видеть, что время итерации растет экспоненциально: (

0
10000
0:00:00.674115
1
10000
0:00:00.833377
2
10000
0:00:01.525314
3
10000
0:00:04.194715
4
10000
0:00:08.139040
5
10000
0:00:17.852815

Я буду очень признателен за любую помощь:)

Ответы

Ответ 1

Резюме

Вообще говоря, итеративные алгоритмы, особенно с самосоединением или самосоединением, требуют контроля над:

Проблема, описанная здесь, является результатом отсутствия первой. В каждом итерационном номере раздела увеличивается с самосоединением, ведущим к экспоненциальному шаблону. Чтобы решить, что вам нужно либо контролировать количество разделов на каждой итерации (см. Ниже), либо использовать глобальные инструменты, такие как spark.default.parallelism (см. ответ, предоставленный Travis). В общем, первый подход обеспечивает гораздо больший контроль в целом и не влияет на другие части кода.

Оригинальный ответ:

Насколько я могу судить, здесь есть две проблемы с чередованием - увеличение количества разделов и перетасовка служебных данных во время соединений. Оба могут быть легко обработаны, поэтому давайте шаг за шагом.

Сначала создайте помощника для сбора статистики:

import datetime

def get_stats(i, init, init2, init3, init4,
       start, end, desc, cache, part, hashp):
    return {
        "i": i,
        "init": init.getNumPartitions(),
        "init1": init2.getNumPartitions(),
        "init2": init3.getNumPartitions(),
        "init4": init4.getNumPartitions(),
        "time": str(end - start),
        "timen": (end - start).seconds + (end - start).microseconds * 10 **-6,
        "desc": desc,
        "cache": cache,
        "part": part,
        "hashp": hashp
    }

другой помощник для обработки кеширования/разбиения

def procRDD(rdd, cache=True, part=False, hashp=False, npart=16):
    rdd = rdd if not part else rdd.repartition(npart)
    rdd = rdd if not hashp else rdd.partitionBy(npart)
    return rdd if not cache else rdd.cache()

извлечь логику конвейера:

def run(init, description, cache=True, part=False, hashp=False, 
    npart=16, n=6):
    times = []

    for i in range(n):
        start = datetime.datetime.now()

        init2 = procRDD(
                init.map(lambda n: (n, n*3)),
                cache, part, hashp, npart)
        init3 = procRDD(
                init.map(lambda n: (n, n*2)),
                cache, part, hashp, npart)


        # If part set to True limit number of the output partitions
        init4 = init2.join(init3, npart) if part else init2.join(init3) 
        init = init4.map(lambda n: n[0])

        if cache:
            init4.cache()
            init.cache()

        init.count() # Force computations to get time
        end = datetime.datetime.now() 

        times.append(get_stats(
            i, init, init2, init3, init4,
            start, end, description,
            cache, part, hashp
        ))

    return times

и создайте исходные данные:

ncores = 8
init = sc.parallelize(xrange(10000), ncores * 2).cache()

Объедините операцию самостоятельно, если аргумент numPartitions не указан, отрегулируйте количество разделов на выходе в зависимости от количества разделов входных RDD. Это означает увеличение количества разделов с каждой итерацией. Если количество разделов на большие вещи становится уродливым. Вы можете справиться с ними, предоставив аргумент numPartitions для объединения или разделения RDD с каждой итерацией.

timesCachePart = sqlContext.createDataFrame(
        run(init, "cache + partition", True, True, False, ncores * 2))
timesCachePart.select("i", "init1", "init2", "init4", "time", "desc").show()

+-+-----+-----+-----+--------------+-----------------+
|i|init1|init2|init4|          time|             desc|
+-+-----+-----+-----+--------------+-----------------+
|0|   16|   16|   16|0:00:01.145625|cache + partition|
|1|   16|   16|   16|0:00:01.090468|cache + partition|
|2|   16|   16|   16|0:00:01.059316|cache + partition|
|3|   16|   16|   16|0:00:01.029544|cache + partition|
|4|   16|   16|   16|0:00:01.033493|cache + partition|
|5|   16|   16|   16|0:00:01.007598|cache + partition|
+-+-----+-----+-----+--------------+-----------------+

Как вы можете видеть, когда время выполнения перераспределения более или менее постоянное. Вторая проблема заключается в том, что выше данные разбиваются случайным образом. Чтобы обеспечить производительность соединения, мы хотели бы иметь одинаковые ключи для одного раздела. Для этого мы можем использовать хэш-разделитель:

timesCacheHashPart = sqlContext.createDataFrame(
    run(init, "cache + hashpart", True, True, True, ncores * 2))
timesCacheHashPart.select("i", "init1", "init2", "init4", "time", "desc").show()

+-+-----+-----+-----+--------------+----------------+
|i|init1|init2|init4|          time|            desc|
+-+-----+-----+-----+--------------+----------------+
|0|   16|   16|   16|0:00:00.946379|cache + hashpart|
|1|   16|   16|   16|0:00:00.966519|cache + hashpart|
|2|   16|   16|   16|0:00:00.945501|cache + hashpart|
|3|   16|   16|   16|0:00:00.986777|cache + hashpart|
|4|   16|   16|   16|0:00:00.960989|cache + hashpart|
|5|   16|   16|   16|0:00:01.026648|cache + hashpart|
+-+-----+-----+-----+--------------+----------------+

Время выполнения является постоянным, как и раньше, и небольшое улучшение по сравнению с основным разделением.

Теперь вы можете использовать кеш только как ссылку:

timesCacheOnly = sqlContext.createDataFrame(
    run(init, "cache-only", True, False, False, ncores * 2))
timesCacheOnly.select("i", "init1", "init2", "init4", "time", "desc").show()


+-+-----+-----+-----+--------------+----------+
|i|init1|init2|init4|          time|      desc|
+-+-----+-----+-----+--------------+----------+
|0|   16|   16|   32|0:00:00.992865|cache-only|
|1|   32|   32|   64|0:00:01.766940|cache-only|
|2|   64|   64|  128|0:00:03.675924|cache-only|
|3|  128|  128|  256|0:00:06.477492|cache-only|
|4|  256|  256|  512|0:00:11.929242|cache-only|
|5|  512|  512| 1024|0:00:23.284508|cache-only|
+-+-----+-----+-----+--------------+----------+

Как вы можете видеть количество разделов (init2, init3, init4) для версии с кешем удваивается с каждой итерацией, а время выполнения пропорционально количеству разделов.

Наконец, мы можем проверить, можем ли мы улучшить производительность при большом количестве разделов, если мы используем хэш-разделитель:

timesCacheHashPart512 = sqlContext.createDataFrame(
    run(init, "cache + hashpart 512", True, True, True, 512))
timesCacheHashPart512.select(
    "i", "init1", "init2", "init4", "time", "desc").show()
+-+-----+-----+-----+--------------+--------------------+
|i|init1|init2|init4|          time|                desc|
+-+-----+-----+-----+--------------+--------------------+
|0|  512|  512|  512|0:00:14.492690|cache + hashpart 512|
|1|  512|  512|  512|0:00:20.215408|cache + hashpart 512|
|2|  512|  512|  512|0:00:20.408070|cache + hashpart 512|
|3|  512|  512|  512|0:00:20.390267|cache + hashpart 512|
|4|  512|  512|  512|0:00:20.362354|cache + hashpart 512|
|5|  512|  512|  512|0:00:19.878525|cache + hashpart 512|
+-+-----+-----+-----+--------------+--------------------+

Улучшение не так впечатляет, но если у вас небольшой кластер и много данных, все равно стоит попробовать.

Я думаю, что убрать сообщение здесь - это вопрос разбиения. Существуют контексты, где он обрабатывается для вас (mllib, sql), но если вы используете операции низкого уровня, это ваша ответственность.

Ответ 2

Проблема заключается в том, что (поскольку нуль323 указал в своем тщательном ответе), что вызов объединения без указания количества разделов может (приводит) к увеличению числа разделов. Количество разделов может расти (по-видимому) без ограничений. Есть (по крайней мере) два способа предотвратить увеличение числа разделов (без привязки) при многократном вызове join.

Метод 1:

Как указывал ноль323, вы можете указать количество разделов вручную при вызове соединения. Например

rdd1.join(rdd2, numPartitions)

Это гарантирует, что количество разделов не будет превышать numPartitions и, в частности, количество разделов не будет постоянно расти.

Метод 2:

При создании SparkConf вы можете указать уровень по умолчанию parallelism. Если это значение установлено, тогда, когда вы вызываете такие функции, как join без указания numPartitions, вместо этого будет использоваться значение по умолчанию parallelism, эффективно ограничивающее количество разделов и предотвращающее их рост. Вы можете установить этот параметр как

conf=SparkConf.set("spark.default.parallelism", numPartitions)
sc = SparkContex(conf=conf)   

Ответ 3

Rdds неизменяемы. Попробуйте сделать rdd = rdd.cache()