Многопроцессорная обработка Python: понимание логики "размера фрагмента"

Какие факторы определяют оптимальный аргумент chunksize для таких методов, как multiprocessing.Pool.map()? Метод .map() кажется, использует произвольную эвристику для размера по умолчанию (объяснено ниже); что мотивирует этот выбор и есть ли более продуманный подход, основанный на конкретной ситуации/настройке?

Пример - скажи, что я

  • Передача iterable в .map(), содержащего ~ 15 миллионов элементов;
  • Работа на машине с 24 ядрами и использование processes = os.cpu_count() по умолчанию processes = os.cpu_count() в multiprocessing.Pool().

Мое наивное мышление состоит в том, чтобы дать каждому из 24 работников одинаковый размер, то есть 15_000_000/24 или 625 000. Большие куски должны уменьшить текучесть кадров/накладные расходы при полном использовании всех работников. Но, похоже, что в нем отсутствуют некоторые потенциальные недостатки предоставления больших партий каждому работнику. Это неполная картина, и что мне не хватает?


Часть моего вопроса проистекает из логики по умолчанию для if chunksize=None: оба .map() и .starmap() вызывают .map_async(), который выглядит следующим образом:

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
               error_callback=None):
    # ... (materialize 'iterable' to list if it an iterator)
    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)  # ????
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0

Какая логика стоит за divmod(len(iterable), len(self._pool) * 4)? Это означает, что размер фрагмента будет ближе к 15_000_000/(24 * 4) == 156_250. Каково намерение умножить len(self._pool) на 4?

Это делает полученный размер фрагмента в 4 раза меньше, чем моя "наивная логика" сверху, которая состоит из простого деления длины итерируемого на количество работников в pool._pool.

Наконец, есть еще один фрагмент из документации Python по .imap() который еще больше .imap() мое любопытство:

Аргумент chunksize аргументом, используемым методом map(). Для очень длинных итераций использование большого значения для chunksize может сделать задание намного быстрее, чем использование значения по умолчанию 1.


Соответствующий ответ, который полезен, но слишком высокоуровневый: многопроцессорность Python: почему большие куски медленнее?,

Ответы

Ответ 1

Короткий ответ

Пул chunksize-алгоритм является эвристическим. Он предоставляет простое решение для всех возможных проблемных сценариев, которые вы пытаетесь внедрить в методы пула. Как следствие, он не может быть оптимизирован для какого-либо конкретного сценария.

Алгоритм произвольно делит итерируемое примерно на четыре раза больше фрагментов, чем наивный подход. Чем больше блоков, тем больше накладных расходов, но повышается гибкость планирования. Как этот ответ покажет, это приводит к более высокой загрузке работника в среднем, но без гарантии более короткого общего времени вычислений для каждого случая.

"Приятно знать, - подумаете вы, - но как знание этого помогает мне в решении конкретных проблем с многопроцессорностью?" Ну, это не так. Более честный короткий ответ: "короткого ответа нет", "многопроцессорная обработка сложна" и "это зависит". Наблюдаемый симптом может иметь разные корни даже для сходных сценариев.

Этот ответ пытается дать вам базовые понятия, помогающие получить более четкое представление о черном ящике планирования пула. Он также пытается дать вам некоторые базовые инструменты для распознавания и предотвращения потенциальных обрывов, поскольку они связаны с размером кусков.

Оглавление

Часть I

  1. Определения
  2. Цели распараллеливания
  3. Сценарии распараллеливания
  4. Риски Chunksize> 1
  5. Пул Chunksize-Алгоритм
  6. Количественная оценка эффективности алгоритма

    6.1 Модели

    6.2 Параллельное расписание

    6.3 Эффективность

    6.3.1 Абсолютная эффективность распределения (ADE)

    6.3.2 Относительная эффективность распределения (RDE)

Часть II

  1. Наивный и пул Chunksize-Алгоритм
  2. Проверка на практике
  3. Заключение

Сначала необходимо уточнить некоторые важные термины.


1. Определения


ломоть

Чанк здесь - это доля iterable -argument, указанного в вызове пула -method. Как вычисляется размер фрагмента и как это может повлиять, - тема этого ответа.


задача

Физическое представление задачи в рабочем процессе в терминах данных можно увидеть на рисунке ниже.

figure0

На рисунке показан пример вызова функции pool.map(), отображаемый вдоль строки кода, взятой из функции multiprocessing.pool.worker, где задача, считанная из inqueue распаковывается. worker является основным основным -function в MainThread пула-рабочего процесса. func -argument указано в бассейне -method будет соответствовать только func -variable внутри worker -function для методов одного вызова, как apply_async и для imap с chunksize=1. Для остальной части бассейна -method S с chunksize -parameter обработка -function func будет картостроитель -function (mapstar или starmapstar). Эта функция отображает пользовательский func -parameter на каждый элемент передаваемого фрагмента итерируемого (-> "map-tasks"). Время, которое требуется, определяет задачу также как единицу работы.


Taskel

В то время как использование слова "задача" для всей обработки одного блока соответствует коду в multiprocessing.pool, нет никаких указаний на то, как один вызов func, указанной пользователем, с одним элементом блока в качестве аргумента (ов).), следует упомянуть. Чтобы избежать путаницы, возникающей из-за конфликтов имен (подумайте о maxtasksperchild -parameter для пула __init__ -method), этот ответ будет относиться к отдельным единицам работы в рамках задачи как к Taskel.

Задача (из задачи + элемент) - это наименьшая единица работы в задаче. Это однократное выполнение функции, указанной в func -parameter Pool -method, вызываемой с аргументами, полученными из одного элемента переданного фрагмента. Задача состоит из chunksize большого размера.


Распределение издержек (PO)

PO состоит из внутренних издержек Python и служебных данных для межпроцессного взаимодействия (IPC). Служебная нагрузка для каждой задачи в Python поставляется с кодом, необходимым для упаковки и распаковки задач и их результатов. Служебная нагрузка IPC сопровождается необходимой синхронизацией потоков и копированием данных между различными адресными пространствами (требуется два шага копирования: parent → queue → child). Количество накладных расходов IPC зависит от OS-, hardware- и размера данных, что затрудняет обобщение воздействия.


2. Цели распараллеливания

При использовании многопроцессорной обработки нашей общей целью (очевидно) является минимизация общего времени обработки для всех задач. Для достижения этой общей цели нашей технической задачей должна быть оптимизация использования аппаратных ресурсов.

Некоторые важные подцели для достижения технической цели:

  • минимизировать издержки распараллеливания (наиболее известный, но не один: IPC)
  • высокая загрузка всех процессорных ядер
  • ограничение использования памяти для предотвращения чрезмерного подкачки ОС (очистки)

Сначала задачи должны быть достаточно сложными (интенсивными) в вычислительном отношении, чтобы вернуть ПО, которое мы должны заплатить за распараллеливание. Актуальность ПО уменьшается с увеличением абсолютного времени вычислений на одно задание. Или, другими словами, чем больше абсолютное время вычислений для каждой задачи, тем менее значимой становится потребность в сокращении ПО. Если ваши вычисления займут несколько часов на одну задачу, накладные расходы IPC будут незначительными по сравнению с ними. Основной задачей здесь является предотвращение простоя рабочих процессов после распределения всех задач. Держа все ядра загруженными, значит, мы распараллеливаемся как можно больше.


3. Сценарии распараллеливания

Какие факторы определяют оптимальный аргумент размера фрагмента для таких методов, как multiprocessing.Pool.map()

Основным фактором, о котором идет речь, является то, сколько времени вычислений может варьироваться в зависимости от наших задач. Чтобы назвать это, выбор оптимального размера кусочка определяется...

Коэффициент вариации (CV) для расчета времени на одно задание.

Два крайних сценария в масштабе, следующие из степени этого изменения:

  1. Для всех задач требуется одинаковое время вычислений.
  2. Задача может занять несколько секунд или дней, чтобы закончить.

Для лучшей запоминаемости я буду ссылаться на эти сценарии как:

  1. Плотный сценарий
  2. Широкий сценарий


Плотный сценарий

В плотном сценарии было бы желательно распределить все задачи одновременно, чтобы свести к минимуму необходимое IPC и переключение контекста. Это означает, что мы хотим создать только столько кусков, сколько рабочих процессов существует. Как уже указывалось выше, вес ПО увеличивается с более коротким временем вычислений на одно задание.

Для максимальной пропускной способности мы также хотим, чтобы все рабочие процессы были заняты до тех пор, пока не будут выполнены все задачи (без рабочих на холостом ходу). Для этой цели распределенные порции должны быть одинакового размера или близки к.


Широкий сценарий

Основным примером для широкого сценария будет проблема оптимизации, когда результаты либо быстро сходятся, либо вычисления могут занять часы, если не дни. Обычно невозможно предсказать, какую смесь "легких задач" и "тяжелых задач" будет содержать задание в таком случае, поэтому не рекомендуется распределять слишком много задач одновременно в пакете задач. Распределение меньшего количества задач одновременно, чем это возможно, означает увеличение гибкости планирования. Это необходимо для достижения нашей подцели высокой эффективности использования всех ядер.

Если методы Pool по умолчанию будут полностью оптимизированы для плотного сценария, они будут все больше создавать неоптимальные временные характеристики для каждой проблемы, расположенной ближе к широкому сценарию.


4. Риски Chunksize> 1

Рассмотрим пример упрощенного псевдокода широкого сценария -iterable, который мы хотим передать в пул -method:

good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]

Вместо реальных значений мы притворяемся, что видим необходимое время вычисления в секундах, для простоты только 1 минуту или 1 день. Мы предполагаем, что в пуле четыре рабочих процесса (на четырех ядрах), а для chunksize задано значение 2. Поскольку порядок будет сохранен, куски, отправленные рабочим, будут такими:

[(60, 60), (86400, 60), (86400, 60), (60, 84600)]

Поскольку у нас достаточно рабочих и время вычислений достаточно велико, мы можем сказать, что каждый рабочий процесс получит кусок, над которым он будет работать в первую очередь. (Это не должно иметь место для быстрого выполнения задач). Далее мы можем сказать, что вся обработка займет около 86400 + 60 секунд, потому что это наибольшее общее время вычислений для порции в этом искусственном сценарии, и мы распределяем порции только один раз.

Теперь рассмотрим эту итерацию, в которой только один элемент меняет свою позицию по сравнению с предыдущей итерацией:

bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]

... и соответствующие куски:

[(60, 60), (86400, 86400), (60, 60), (60, 84600)]

Просто неудача с сортировкой нашей итерации почти вдвое (86400 + 86400) нашего общего времени обработки! Рабочий, получающий порочный (86400, 86400) -chunk, блокирует вторую тяжелую задачу в своей задаче, чтобы распределить его одному из рабочих на холостом ходу, уже покончившим с (60, 60) -chunk. Мы, очевидно, не рискнули бы таким неприятным исходом, если бы мы установили chunksize=1.

Это риск больших кусков. С более высокими размерами блоков мы торгуем гибкостью планирования за меньшие накладные расходы, а в случаях, подобных описанным выше, это плохая сделка.

Как мы увидим в главе 6. Количественная оценка эффективности алгоритма, большие размеры фрагментов также могут привести к неоптимальным результатам для плотных сценариев.


5. Пул Chunksize-Алгоритм

Ниже вы найдете слегка измененную версию алгоритма внутри исходного кода. Как видите, я обрезал нижнюю часть и обернул ее в функцию для внешнего вычисления аргумента chunksize. Я также заменил 4 на factor и передал вызовы len().

# mp_utils.py

def calc_chunksize(n_workers, len_iterable, factor=4):
    """Calculate chunksize argument for Pool-methods.

    Resembles source-code within 'multiprocessing.pool.Pool._map_async'.
    """
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    return chunksize

Чтобы убедиться, что мы все на одной странице, вот что делает divmod:

divmod(x, y) - встроенная функция, которая возвращает (x//y, x%y). x//y - это деление по полу, которое возвращает округленное вниз число от x/y, а x % y - это операция по модулю, возвращающая остаток от x/y. Следовательно, например, divmod(10, 3) возвращает (3, 1).

Теперь, когда вы посмотрите на chunksize, extra = divmod(len_iterable, n_workers * 4), вы заметите, что n_workers - это делитель y по x/y и умножение на 4, без дальнейшей корректировки, if extra: chunksize +=1, приводит к начальному размеру фрагмента по крайней мере в четыре раза меньше (для len_iterable >= n_workers * 4), чем это было бы в противном случае.

Для просмотра эффекта умножения на 4 на промежуточный результат размера фрагмента рассмотрим следующую функцию:

def compare_chunksizes(len_iterable, n_workers=4):
    """Calculate naive chunksize, Pool stage-1 chunksize and the chunksize
    for Pool complete algorithm. Return chunksizes and the real factors by
    which naive chunksizes are bigger.
    """
    cs_naive = len_iterable // n_workers or 1  # naive approach
    cs_pool1 = len_iterable // (n_workers * 4) or 1  # incomplete pool algo.
    cs_pool2 = calc_chunksize(n_workers, len_iterable)

    real_factor_pool1 = cs_naive / cs_pool1
    real_factor_pool2 = cs_naive / cs_pool2

    return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2

Вышеуказанная функция вычисляет наивный размер фрагмента (cs_naive) и размер cs_naive первого шага алгоритма размера фрагмента пула (cs_pool1), а также размер фрагмента для полного алгоритма cs_pool2 (cs_pool2). Далее он вычисляет реальные коэффициенты rf_pool1 = cs_naive/cs_pool1 и rf_pool2 = cs_naive/cs_pool2, которые говорят нам, во сколько раз наивно рассчитанные размеры фрагментов больше, чем внутренние версии пула.

Ниже вы видите две фигуры, созданные с помощью этой функции. Левый рисунок просто показывает размеры фрагментов для n_workers=4 вплоть до итерированной длины до 500. На правом рисунке показаны значения для rf_pool1. Для итерируемой длины 16 реальный фактор становится >=4 (для len_iterable >= n_workers * 4), и его максимальное значение равно 7 для итерируемой длины 28-31. Это значительное отклонение от исходного фактора 4, к которому сходится алгоритм для более длинных итераций. "Более длинный" здесь относительный и зависит от количества указанных работников.

figure1

Помните, что в chunksize cs_pool1 прежнему отсутствует extra -adjustment с остатком от divmod содержащимся в cs_pool2 из полного алгоритма.

Алгоритм продолжается с:

if extra:
    chunksize += 1

Теперь в случаях, когда есть остаток (extra от divmod-операции), увеличение размера фрагмента на 1, очевидно, не может сработать для каждой задачи. В конце концов, если бы это было так, не было бы остатка для начала.

Как вы можете видеть на рисунках ниже, " дополнительная обработка " приводит к тому, что реальный коэффициент для rf_pool2 теперь сходится к 4 ниже 4 а отклонение несколько плавнее. Стандартное отклонение для n_workers=4 и len_iterable=500 падает с 0.5233 для rf_pool1 до 0.4115 для rf_pool2.

figure2

В конце концов, увеличение chunksize на 1 приводит к тому, что последняя переданная задача имеет размер len_iterable % chunksize or chunksize.

Однако чем интереснее и как мы увидим позже, тем больше эффект дополнительной обработки можно наблюдать для числа сгенерированных кусков (n_chunks). Для достаточно длинных итераций алгоритм завершения пула chunksize (n_pool2 на рисунке ниже) стабилизирует количество фрагментов при n_chunks == n_workers * 4. Напротив, наивный алгоритм (после первоначальной отрыжки) продолжает чередоваться между n_chunks == n_workers и n_chunks == n_workers + 1 мере увеличения длины итерируемого.

figure3

Ниже вы найдете две улучшенные данные -function для пула и простой алгоритм chunksize. Вывод этих функций будет необходим в следующей главе.

# mp_utils.py

from collections import namedtuple


Chunkinfo = namedtuple(
    'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks',
                  'chunksize', 'last_chunk']
)

def calc_chunksize_info(n_workers, len_iterable, factor=4):
    """Calculate chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    # '+ (len_iterable % chunksize > 0)' exploits that 'True == 1'
    n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
    # exploit '0 == False'
    last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )

Не смущайтесь, возможно, неожиданным взглядом calc_naive_chunksize_info. extra от divmod не используется для вычисления размера фрагмента.

def calc_naive_chunksize_info(n_workers, len_iterable):
    """Calculate naive chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers)
    if chunksize == 0:
        chunksize = 1
        n_chunks = extra
        last_chunk = chunksize
    else:
        n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
        last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )

6. Количественная оценка эффективности алгоритма

Теперь, после того, как мы увидели, как выходные данные алгоритма chunksize Pool выглядят иначе, чем выходные данные наивного алгоритма...

  • Как определить, действительно ли подход к пулу улучшает что-либо?
  • И что именно это может быть?

Как показано в предыдущей главе, для более длинных итераций (большее число задач) алгоритм пула chunksize-алгоритм приблизительно делит итерируемое на четыре раза больше фрагментов, чем наивный метод. Меньшие куски означают больше задач, а больше задач - больше издержек распараллеливания (PO), затраты, которые должны быть сопоставлены с преимуществом повышенной гибкости планирования (вспомните "Риски Chunksize> 1").

По довольно очевидным причинам базовый алгоритм chunksize пула не может сравниться с нами в гибкости планирования и PO. Служебные расходы IPC зависят от OS-, hardware- и размера данных. Алгоритм не может знать, на каком оборудовании мы запускаем наш код, и не имеет понятия, сколько времени займет выполнение задачи. Это эвристика, обеспечивающая базовую функциональность для всех возможных сценариев. Это означает, что он не может быть оптимизирован для какого-либо конкретного сценария. Как упоминалось ранее, ПО также становится все менее важной для увеличения времени вычислений на одно задание (отрицательная корреляция).

Когда вы вспоминаете Цели распараллеливания из главы 2, одним из пунктов было следующее:

  • высокая загрузка всех процессорных ядер

Вышеупомянутый алгоритм пула chunksize-алгоритма, который можно попытаться улучшить, - это минимизация простоя рабочих процессов и, соответственно, использование процессорных ядер.

Повторяющийся вопрос о SO в отношении multiprocessing.Pool. Люди задаются вопросом о неиспользуемых ядрах/неработающих рабочих процессах в ситуациях, когда можно ожидать, что все рабочие процессы заняты. Хотя на это может быть много причин, простаивающие рабочие процессы ближе к концу вычислений - это наблюдение, которое мы часто можем сделать, даже с плотными сценариями (равное время вычислений на одно задание) в тех случаях, когда число работников не является делителем числа. кусков (n_chunks % n_workers > 0).

Вопрос сейчас:

Как мы можем практически перевести наше понимание размеров кусков во что-то, что позволяет нам объяснить наблюдаемое использование работника, или даже сравнить эффективность различных алгоритмов в этом отношении?


6.1 Модели

Для получения более глубокого понимания здесь нам нужна форма абстракции параллельных вычислений, которая упрощает чрезмерно сложную реальность до управляемой степени сложности, сохраняя при этом значение в определенных границах. Такая абстракция называется моделью. Реализация такой " модели распараллеливания" (PM) генерирует метаданные (временные метки), отображаемые рабочим, как реальные вычисления, если бы данные собирались. Сгенерированные моделью метаданные позволяют прогнозировать метрики параллельных вычислений при определенных ограничениях.

figure4

Одной из двух подмоделей в определенном здесь PM является модель распределения (DM). DM объясняет, как атомные единицы работы (Taskels) распределяются по параллельным рабочим и времени, когда нет других факторов, кроме соответствующего алгоритма chunksize, количества рабочих, входных данных -iterable (количество Taskels) и их длительности вычислений считается. Это означает, что любая форма накладных расходов не включена.

Для получения полного PM DM расширяется с помощью модели служебных данных (OM), представляющей различные формы служебных данных параллелизации (PO). Такая модель должна быть откалибрована для каждого узла отдельно (зависимости hardware-, OS-). Сколько форм служебных данных представлено в ОМ, остается открытым, и поэтому могут существовать несколько ОМ с различной степенью сложности. Какой уровень точности необходим для реализации OM, определяется общим весом PO для конкретного вычисления. Более короткие задачи приводят к большему весу ПО, что, в свою очередь, требует более точного ОМ, если мы пытались предсказать эффективность распараллеливания (PE).


6.2 Параллельное расписание (PS)

Параллельное расписание - это двумерное представление параллельных вычислений, где ось X представляет время, а ось Y представляет пул параллельных рабочих. Количество рабочих и общее время вычислений отмечают протяженность прямоугольника, в котором нарисованы меньшие прямоугольники. Эти меньшие прямоугольники представляют атомные единицы работы (задачи).

Ниже вы найдете визуализацию PS, построенную с данными из алгоритма DM chunksize для пула для плотного сценария.

figure5

  • Ось X разделена на равные единицы времени, где каждая единица соответствует времени вычисления, которое требуется для задачи.
  • Ось Y делится на количество рабочих процессов, которые использует пул.
  • Задача здесь отображается как наименьший прямоугольник голубого цвета, помещенный во временную шкалу (график) анонимного рабочего процесса.
  • Задача - это одна или несколько задач на рабочем графике, которые постоянно выделяются одним и тем же оттенком.
  • Единицы времени холостого хода представлены красным цветом.
  • Параллельное расписание разбито на разделы. Последний раздел является хвостовой частью.

Названия составных частей можно увидеть на картинке ниже.

figure6

В полном PM, включая OM, доля холостого хода не ограничивается хвостом, но также включает пространство между задачами и даже между задачами.


6.3 Эффективность

Замечания:

Начиная с более ранних версий этого ответа, "Эффективность распараллеливания (PE)" была переименована в "Эффективность распределения (DE)". PE теперь относится к накладным расходам, включая эффективность.

Представленные выше модели позволяют количественно оценить коэффициент использования работника. Мы можем различить:

  • Эффективность распределения (DE) - рассчитывается с помощью DM (или упрощенного метода для плотного сценария).
  • Эффективность распараллеливания (PE) - либо рассчитывается с помощью калиброванного PM (прогноз), либо рассчитывается на основе метаданных реальных вычислений.

Важно отметить, что вычисленные коэффициенты полезного действия не коррелируют автоматически с более быстрым общим вычислением для данной проблемы распараллеливания. Использование работника в этом контексте различает только работника, у которого есть начальная, но незаконченная задача, и работника, у которого нет такой "открытой" задачи. Это означает, что возможный холостой ход в течение промежутка времени не регистрируется.

Все вышеупомянутые эффективности в основном получаются путем вычисления коэффициента деления Busy Share/Parallel Schedule. Разница между DE и PE заключается в том, что Busy Share занимает меньшую часть общего параллельного расписания для расширенного PM.

Далее в этом ответе будет обсуждаться только простой метод расчета DE для плотного сценария. Этого достаточно для сравнения различных алгоритмов chunksize, так как...

  1. ... DM - это часть PM, которая меняется в зависимости от используемых алгоритмов chunksize.
  2. ... Плотный сценарий с равной продолжительностью вычислений на одно задание отображает "стабильное состояние", для которого эти промежутки времени выпадают из уравнения. Любой другой сценарий может привести к случайным результатам, так как порядок задач будет иметь значение.

6.3.1 Абсолютная эффективность распределения (ADE)

Эту базовую эффективность можно рассчитать в целом, разделив занятую долю на весь потенциал параллельного расписания:

Абсолютная эффективность распределения (ADE)= занятая доля/параллельное расписание

Для плотного сценария упрощенный код расчета выглядит следующим образом:

# mp_utils.py

def calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Absolute Distribution Efficiency (ADE).

    'len_iterable' is not used, but contained to keep a consistent signature
    with 'calc_rde'.
    """
    if n_workers == 1:
        return 1

    potential = (
        ((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize)
        + (n_chunks % n_workers == 1) * last_chunk
    ) * n_workers

    n_full_chunks = n_chunks - (chunksize > last_chunk)
    taskels_in_regular_chunks = n_full_chunks * chunksize
    real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk
    ade = real / potential

    return ade

Если доля холостого хода отсутствует, доля занятого будет равна параллельному расписанию, следовательно, мы получим ADE в размере 100%. В нашей упрощенной модели это сценарий, в котором все доступные процессы будут заняты все время, необходимое для обработки всех задач. Другими словами, вся работа эффективно распределяется до 100 процентов.

Но почему я продолжаю называть PE абсолютным PE здесь?

Чтобы понять это, мы должны рассмотреть возможный случай для размера фрагмента (cs), который обеспечивает максимальную гибкость планирования (также, число Горцев может быть. Совпадение?):

___________________________________ ~ ONE ~ ___________________________________

Если, например, у нас есть четыре рабочих процесса и 37 задач, рабочие будут работать на холостом ходу даже с chunksize=1 только потому, что n_workers=4 не является делителем 37. Остальная часть деления n_workers=4 равна 1. Этот единственный Оставшаяся часть работы должна быть обработана единственным работником, а остальные три - на холостом ходу.

Кроме того, еще будет один рабочий на холостом ходу с 39 задачами, как вы можете видеть на рисунке ниже.

figure7

Когда вы сравните верхнее параллельное расписание для chunksize=1 с приведенной ниже версией для chunksize=3, вы заметите, что верхнее параллельное расписание меньше, а временная шкала на оси x короче. Теперь должно стать очевидным, как неожиданно большие куски также могут привести к увеличению общего времени вычислений даже для плотных сценариев.

Но почему бы просто не использовать длину оси X для расчетов эффективности?

Потому что накладные расходы не содержатся в этой модели. Это будет отличаться для обоих кусков, следовательно, ось X не является прямо сопоставимой. Затраты по-прежнему могут привести к увеличению общего времени вычислений, как показано в случае 2 на рисунке ниже.

figure8


6.3.2 Относительная эффективность распределения (RDE)

Значение ADE не содержит информацию о том, возможно ли лучшее распределение задач с размером фрагмента, равным 1. Лучшее значение здесь по-прежнему означает меньшую долю холостого хода.

Чтобы получить значение DE, скорректированное на максимально возможное значение DE, мы должны разделить рассмотренную ADE на ADE, chunksize=1 для chunksize=1.

Относительная эффективность распределения (RDE)= ADE_cs_x/ADE_cs_1

Вот как это выглядит в коде:

# mp_utils.py

def calc_rde(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Relative Distribution Efficiency (RDE)."""
    ade_cs1 = calc_ade(
        n_workers, len_iterable, n_chunks=len_iterable,
        chunksize=1, last_chunk=1
    )
    ade = calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk)
    rde = ade / ade_cs1

    return rde

RDE, как определено здесь, по сути является рассказом о хвосте параллельного расписания. На RDE влияет максимально эффективный размер кусочка, содержащийся в хвосте. (Этот хвост может иметь длину chunksize или last_chunk оси chunksize.) Это приводит к тому, что RDE естественным образом сходится к 100% (даже) для всех видов "хвостов", как показано на рисунке ниже.

figure9

Низкий RDE...

  • является сильным намеком на оптимизацию потенциала.
  • естественно, становится менее вероятным для более длинных итераций, потому что относительная хвостовая часть общего параллельного расписания сокращается.

найти часть II этого ответа здесь ниже.

Ответ 2

Об этом ответе

Этот ответ является частью II принятого ответа выше.


7. Наивный и пул Chunksize-Алгоритм

Прежде чем углубляться в детали, рассмотрим две картинки ниже. Для диапазона разных iterable длин они показывают, как два сравниваемых алгоритма iterable на iterable переданную iterable (к тому времени это будет последовательность) и как могут быть распределены результирующие задачи. Порядок работников является случайным, и количество распределенных задач на одного работника в действительности может отличаться от этого изображения для легких задач и/или задач в широком сценарии. Как упоминалось ранее, накладные расходы также не включены сюда. Однако для достаточно тяжелых задач в плотном сценарии с пренебрежимо малым размером передаваемых данных реальные вычисления дают очень похожую картину.

cs_4_50

cs_200_250

Как показано в главе " 5. Алгоритм пула Chunksize ", с помощью алгоритма пула chunksize число чанков стабилизируется при n_chunks == n_workers * 4 для достаточно больших итераций, при этом он продолжает переключаться между n_chunks == n_workers и n_chunks == n_workers + 1 с наивным подходом. Для наивного алгоритма применяется: Поскольку n_chunks % n_workers == 1 имеет значение True для n_chunks == n_workers + 1, будет создан новый раздел, где будет работать только один работник.

Наивный Chunksize-Алгоритм:

Вы можете подумать, что создали задачи с одинаковым количеством рабочих, но это будет справедливо только в тех случаях, когда нет остатка для len_iterable/n_workers. Если есть остаток, будет новый раздел только с одной задачей для одного работника. В этот момент ваши вычисления больше не будут параллельными.

Ниже вы видите фигуру, аналогичную показанной в главе 5, но показывающую количество секций вместо количества фрагментов. Для алгоритма полного размера пула (n_pool2) n_sections стабилизируется при печально известном, жестко закодированном факторе 4. Для наивного алгоритма n_sections будет чередоваться между одним и двумя.

figure10

Для пула chunksize-алгоритма стабилизация при n_chunks = n_workers * 4 посредством вышеупомянутой дополнительной обработки предотвращает создание нового раздела здесь и ограничивает общий ресурс холостого хода одним рабочим в течение достаточно длинных итераций. Не только это, но и алгоритм будет продолжать сокращать относительный размер доли холостого хода, что приводит к значению RDE, приближающемуся к 100%.

Например, "достаточно долго" для n_workers=4 - это len_iterable=210. Для итераций, равных или превышающих это, доля холостого хода будет ограничена одним рабочим, черта, первоначально утраченная из-за 4 -multiplication в алгоритме chunksize в первую очередь.

figure11

Наивный алгоритм уменьшения размера также сходится к 100%, но это происходит медленнее. Сходящийся эффект зависит исключительно от того, что относительная часть хвоста сжимается для случаев, когда будет две секции. Этот хвост только с одним занятым работником ограничен длиной оси x n_workers - 1, максимальный возможный остаток для len_iterable/n_workers.

Как фактические значения RDE отличаются для наивного и пула chunksize-алгоритма?

Ниже вы найдете две тепловые карты, показывающие значения RDE для всех повторяющихся длин до 5000, для всех чисел рабочих от 2 до 100. Цветовая шкала изменяется от 0,5 до 1 (50% -100%). Вы заметите намного больше темных областей (более низкие значения RDE) для наивного алгоритма в левой тепловой карте. В отличие от этого, пул chunksize-алгоритм справа рисует гораздо более солнечную картину.

figure12

Диагональный градиент нижних левых темных углов по сравнению с правыми верхними светлыми углами снова показывает зависимость от числа рабочих для того, что можно назвать "длинным итеративным".

Насколько плохо это может быть с каждым алгоритмом?

При использовании алгоритма пула chunksize значение RDE 81,25% является наименьшим значением для диапазона рабочих и повторяемых длин, указанных выше:

figure13

С наивным алгоритмом chunksize все может стать намного хуже. Самый низкий рассчитанный RDE здесь составляет 50,72%. В этом случае почти на половину времени вычислений работает только один рабочий! Итак, берегитесь, гордые обладатели Knights Landing. ;)

figure14


8. Проверка реальности

В предыдущих главах мы рассматривали упрощенную модель для чисто математической задачи распределения, лишенную мельчайших деталей, которые делают многопроцессорность такой сложной темой в первую очередь. Чтобы лучше понять, насколько далеко одна Модель распределения (DM) может внести вклад в объяснение наблюдаемого использования работника в реальности, теперь мы немного рассмотрим параллельные графики, составленные на основе реальных вычислений.

Настроить

На следующих графиках представлены параллельные выполнения простой фиктивной функции, связанной с процессором, которая вызывается с различными аргументами, поэтому мы можем наблюдать, как нарисованное параллельное расписание изменяется в зависимости от входных значений. "Работа" внутри этой функции состоит только из итерации по объекту диапазона. Этого уже достаточно, чтобы сохранить ядро занятым, так как мы передаем огромные числа. При желании функция принимает некоторые уникальные для Taskel дополнительные data которые просто возвращаются без изменений. Поскольку каждая задача состоит из одного и того же объема работы, мы все еще имеем дело с плотным сценарием.

Функция украшена оболочкой, которая берет временные метки с разрешением ns (Python 3. 7+). Временные метки используются для расчета временного промежутка задачи и, следовательно, позволяют рисовать эмпирическое параллельное расписание.

@stamp_taskel
def busy_foo(i, it, data=None):
    """Dummy function for CPU-bound work."""
    for _ in range(int(it)):
        pass
    return i, data


def stamp_taskel(func):
    """Decorator for taking timestamps on start and end of decorated
    function execution.
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time_ns()
        result = func(*args, **kwargs)
        end_time = time_ns()
        return (current_process().name, (start_time, end_time)), result
    return wrapper

Метод звездной карты пула также оформлен таким образом, что синхронизируется только сама звездная карта -call. "Начало" и "конец" этого вызова определяют минимум и максимум на оси х создаваемого параллельного расписания.

Мы собираемся наблюдать вычисление 40 задач на четырех рабочих процессах на машине со следующими спецификациями: Python 3.7.1, Ubuntu 18.04.2, процессор Intel® Core ™ i7-2600K @3.40 ГГц × 8

Входными значениями, которые будут варьироваться, являются число итераций в цикле for (30k, 30M, 600M) и дополнительный размер отправляемых данных (на одно целое, numpy-ndarray: 0 MiB, 50 MiB).

...
N_WORKERS = 4
LEN_ITERABLE = 40
ITERATIONS = 30e3  # 30e6, 600e6
DATA_MiB = 0  # 50

iterable = [
    # extra created data per taskel
    (i, ITERATIONS, np.arange(int(DATA_MiB * 2**20 / 8)))  # taskel args
    for i in range(LEN_ITERABLE)
]


with Pool(N_WORKERS) as pool:
    results = pool.starmap(busy_foo, iterable)

Показанные ниже прогоны были выбраны вручную, чтобы иметь одинаковый порядок чанков, чтобы вы могли лучше различать различия по сравнению с параллельным расписанием из модели распределения, но не забывайте, что порядок, в котором работники получают свою задачу, является недетерминированным.

Предсказание DM

Напомним, что модель распределения "предсказывает" параллельное расписание, как мы уже видели в главе 6.2:

figure15

1-й запуск: 30 тыс. Итераций и 0 МБ данных на одну задачу

figure16

Наш первый пробег здесь очень короткий, задачи очень легкие. Весь pool.starmap() -call занял всего 14,5 мс. Вы заметите, что в отличие от DM, холостой ход не ограничивается хвостовой частью, но также имеет место между задачами и даже между заданиями. Это потому, что наше реальное расписание здесь, естественно, включает в себя все виды накладных расходов. Холостой ход здесь означает только все, что находится за рамкой. Возможный настоящий холостой ход во время выполнения задачи не фиксируется, как уже упоминалось ранее.

Далее вы можете видеть, что не все работники получают свои задачи одновременно. Это связано с тем, что все работники питаются по общей inqueue и только один работник может читать из нее одновременно. То же самое касается и outqueue. Это может вызвать большие расстройства, как только вы передадите данные без предельных размеров, как мы увидим позже.

Кроме того, вы можете видеть, что, несмотря на тот факт, что каждое задание включает в себя одинаковый объем работы, фактический измеренный промежуток времени для задания точно изменяется. Задачам, раздаемым работнику-3 и работнику-4, требуется больше времени, чем обработанным первыми двумя работниками. Я подозреваю, что для этого прогона это связано с тем, что в тот момент турбо-надстройка больше не была доступна на ядрах для worker-3/4, поэтому они выполняли свои задачи с более низкой тактовой частотой.

Все вычисления настолько легки, что аппаратные или операционные факторы хаоса могут сильно исказить PS. Вычисление - это "листок на ветру", и DM -prediction не имеет большого значения даже для теоретически подходящего сценария.

2-й запуск: 30 миллионов итераций и 0 МБ данных на одну задачу

figure17

Увеличение числа итераций в цикле for с 30 000 до 30 миллионов приводит к реальному параллельному расписанию, которое близко к идеальному совпадению с прогнозируемым данными, предоставленными DM, ура! Вычисления на одно задание теперь достаточно тяжелые, чтобы маргинализовать части холостого хода в начале и между ними, оставляя видимой только большую долю холостого хода, которую предсказал DM.

3-й запуск: 30 миллионов итераций и 50 МБ данных на одну задачу

figure18

Сохранение 30M итераций, но дополнительно отправка 50 МБ на одно задание назад и вперед искажает картину снова. Здесь эффект очереди хорошо виден. Worker-4 должен ждать дольше своей второй задачи, чем Worker-1. Теперь представьте себе этот график с 70 работниками!

В случае, если задачи в вычислительном отношении очень легки, но предоставляют значительный объем данных в качестве полезной нагрузки, узкое место одной общей очереди может предотвратить любые дополнительные преимущества добавления большего количества рабочих в пул, даже если они поддерживаются физическими ядрами. В таком случае Worker-1 можно было бы выполнить со своей первой задачей и ожидать нового еще до того, как Worker-40 получит свою первую задачу.

Теперь должно стать очевидным, почему время вычислений в Pool не всегда уменьшается линейно с количеством работников. Отправка относительно больших объемов данных может привести к сценариям, в которых большую часть времени тратится на ожидание копирования данных в адресное пространство работника, и только один работник может быть подан одновременно.

4-й запуск: 600 миллионов итераций и 50 мегабайт данных на одно задание

figure19

Здесь мы снова отправляем 50 МБ, но увеличиваем число итераций с 30 до 600 МБ, что увеличивает общее время вычислений с 10 до 152 с. Снова нарисованное параллельное расписание близко к идеальному совпадению с прогнозируемым, накладные расходы при копировании данных маргинализируются.


9. Вывод

Обсуждаемое умножение на 4 увеличивает гибкость планирования, но также использует неравномерность распределений Taskel. Без этого умножения доля холостого хода была бы ограничена одним рабочим даже для коротких итераций (для DM с плотным сценарием). Алгоритм пула chunksize-нуждается в итерациях ввода, чтобы иметь определенный размер, чтобы восстановить эту черту.

Как мы надеемся, этот ответ показал, что алгоритм chunksize пула в среднем приводит к лучшему использованию ядра по сравнению с наивным подходом, по крайней мере, для среднего случая и поскольку длительные накладные расходы не рассматриваются. Наивный алгоритм здесь может иметь эффективность распределения (DE) всего лишь ~ 51%, в то время как алгоритм размера пула имеет свой минимум - ~ 81%. DE, однако, не включает издержки распараллеливания (PO), такие как IPC. Глава 8 показала, что DE все еще может иметь большую предсказательную силу для плотного сценария с маргинальными накладными расходами.

Несмотря на то, что алгоритм пула chunksize-алгоритма обеспечивает более высокое DE по сравнению с наивным подходом, он не обеспечивает оптимального распределения задач для каждого входного созвездия. Хотя простой статический алгоритм разбиения на блоки не может оптимизировать (включая накладные расходы) эффективность распараллеливания (PE), нет внутренней причины, по которой он не всегда может обеспечить относительную эффективность распределения (RDE), равную 100%, то есть тот же DE, что и с chunksize=1. Простой chunksize-алгоритм состоит только из базовой математики и может свободно "разрезать торт" любым способом.

В отличие от реализации пула алгоритма "одинакового размера-чанкинга", алгоритм "равного размера-чанкинга" обеспечивал бы RDE 100% для каждой len_iterable/n_workers. Алгоритм разделения на четные размеры будет немного сложнее реализовать в исходном коде пула, но его можно модулировать поверх существующего алгоритма, просто упаковывая задачи извне (я сошлюсь здесь на случай, если я уроню Q/A на как это сделать).

Ответ 3

Я думаю, что часть того, что вам не хватает, заключается в том, что ваша наивная оценка предполагает, что каждая единица работы занимает одинаковое количество времени, и в этом случае ваша стратегия будет наилучшей. Но если некоторые задания заканчиваются раньше, чем другие, некоторые ядра могут простаивать в ожидании завершения медленных заданий.

Таким образом, разбивая фрагменты на 4 раза больше частей, тогда, если один блок закончился рано, это ядро может запустить следующий блок (в то время как другие ядра продолжают работать над их более медленным блоком).

Я не знаю, почему они выбрали фактор 4 точно, но это было бы компромиссом между минимизацией накладных расходов кода карты (который требует наибольшего возможного количества чанков) и балансировкой чанков, занимающих различное количество раз (что требует наименьшего возможного чанка)).