Рекомендации по использованию пакета "multiprocessing" в python
Я экспериментирую с использованием модуля multiprocessing
в python
. У меня есть приведенный ниже пример кода, который выполняется без ошибок в ноутбуке ipython. Но я вижу, что есть дополнительные процессы python, возникшие в фоновом режиме при каждом выполнении блока кода в ноутбуке.
import multiprocessing as mp
def f(x):
print "Hello World ", mp.current_process()
return 1
pool = mp.Pool(3)
data = range(0,10)
pool.map(f, data)
Если я сохраняю то же самое в обычном файле .py и выполняю, я сталкиваюсь с ошибками и должен завершить работу терминала, чтобы остановить выполнение программы.
Я исправил это, создав if __name__ == '__main__':
и создав пул под этим, а также используя pool.close()
, чтобы закрыть пул.
Мне интересно узнать, какие рекомендации следует использовать при использовании multiprocessing
и связанных с ним функций, таких как map
, apply
, apply_async
и т.д.? Я планирую использовать этот модуль для параллельного чтения файлов и, надеюсь, применить его к нескольким алгоритмам ML, чтобы ускорить процесс.
Ответы
Ответ 1
Причина, по которой вы должны поместить его в if __name__
..., заключается в том, что когда python запускает новый процесс, он эффективно импортирует этот модуль, тем самым пытаясь снова и снова запускать любой код не в блоке if __name__
.
Лучшая практика - держать вещи в разумно названных, маленьких, проверяемых функциях. Имейте функцию main(), которую вы затем вызываете из своего блока if __name__
.
Избегайте глобальных состояний (и переменных уровня модуля). Это просто усложняет ситуацию. Вместо этого подумайте о передаче вещей и от ваших процессов. Это может быть медленным, поэтому полезно сначала подумать о том, как отправлять как можно меньше данных. Например, если у вас есть большой объект конфигурации, а не отправка всего объекта конфигурации в каждый процесс, разделяйте функции процесса, требуя только один или два атрибута, которые они фактически используют, и просто отправьте их.
Гораздо проще проверять вещи, когда это происходит последовательно, поэтому записывать вещи таким образом, чтобы было легко сделать это последовательно, а не использовать map
или что-то еще облегчающее.
Это хорошая идея, чтобы прокомментировать вещи, так как весь нереста нового процесса иногда может быть медленнее, чем делать все в одном потоке. Модуль gevent тоже довольно крут - если ваша сеть связана с сетью, тогда gevent иногда может быть намного быстрее при выполнении параллельных операций, чем при использовании многопроцессорной обработки.
Ответ 2
Документы python, упомянутые, являются хорошими - ознакомьтесь с с использованием класса многопроцессорности Python.Process. У этого вопроса есть похожие идеи. Я также рекомендовал бы проверить https://www.ibm.com/developerworks/aix/library/au-multiprocessing/. Он находится в python и выделяет некоторые приятные питонические подходы к многопроцессорной обработке.
Ответ 3
Официальная документация Python содержит множество примеров использования. Вероятно, это лучший способ изучить лучшие практики: http://docs.python.org/2/library/multiprocessing.html
Ответ 4
Обзор, архитектура и дизайн несколько практических советов
Исходя из своего (также ограниченного) опыта, я могу поделиться следующими соображениями о том, как работает многопроцессорная среда и как ее использовать. Я не нашел руководства по python.org очень описательными или графическими, поэтому прочитал код. Для всех, у кого было такое же впечатление... это то, что я мог бы исправить до сих пор:
Общие советы по хорошей/лучшей практике
- общие методы реализации:
- управляемый тестами с уменьшенным размером данных: вы не хотите удивляться минутам, если произошел сбой или расчет
- пошагово с профилированием времени:
- во-первых, реализовать & отладка без многопроцессорной обработки
- затем внедрить & отладка одиночной обработки, времени профиля и сравнить накладные расходы без нескольких процессов
- затем увеличьте номер процесса & Время профиля, чтобы определить любые проблемы GIL и время ожидания.
- Простые
Process
или их списки полезны для нацеливания нескольких запусков функций один за другим funtction-2-process.
Pool
управляет распределением пакетных рабочих нагрузок (высокоуровневых задач/команд) между заданным числом Process
es (пул процессов).
- Используйте
Pool
для привязки к процессору (высокая загрузка процессора с пакетными входами/выходами) и pool.ThreadPool
для задач, связанных с вводом-выводом (низкая загрузка процессора с отдельными входами/выходами).
- Для передачи данных между
Process
es, Pool
s, Thread
и ThreadPool
используйте queues.Queue
и подклассы (если порядок результатов имеет значение) или Pipe
с отображением 1-в-1 PipeConnection
к процессам или потокам.
- Совместное использование переменных разных типов (
BaseProxy
, Namespace
s, Queue
s, Pool
или для настройки объектов синхронизации, таких как Barrier
/Lock
/RLock
/Sempaphore
/Condition
) различные процессы используют класс Manager
.
- В случае, если нельзя избежать
GIL
, используйте Manager
для их обработки и попытайтесь отделить интенсивные вычислительные процессы от вычислений GIL
-related (например, анализ в сложных структурах данных и т.д.) И затем соединитесь с Pipe
или поделились Queue
s.
- Работа с несколькими
Pool
может использоваться для назначения разного количества процессов различным задачам. В противном случае просто реализуйте один Pool
с множественным отображением или примените вызовы метода.
- Последовательные параллельные вычислительные задачи, основанные на промежуточных результатах друг друга, могут быть рассчитаны с помощью одного
Pool()
с несколькими Pool.(star)map_async()
или Pool.(star)map()
. Для синхронизации задач друг с другом правильным выбором является экземпляр ApplyResult()
, возвращаемый функцией отображения с ее методами ApplyResult().ready()/.wait()/.get()/.successful()
.
Архитектура и технологический процесс
- Когда запускается
import multiprocessing
, инициализируется _current_process = MainProcess()
, который является подклассом BaseProcess
, но без target
, args
, kwargs
, _paraent_pid
, в основном объект-дескриптор для всех других Process
в уже работающее ядро Python, которое импортирует multiprocessing
.
pool.ThreadPool
является аналогом API для пула, который, вероятно, также имеет аналогичную архитектуру
Pool
основан на 3 потоках демона Pool._task_handler
, Pool._worker_handler
& Pool._result_handler
, которые соединяются с 1 внутренним queue.Queue()
Pool._taskqueue
и 2 внутренними SimpleQueue
Pool._inqueue
и Pool._outqueue
.
Pool._cache
- это словарь, содержащий ApplyResults
& экземпляры подклассов из всех вызовов Pool.appy_async()/_map_async()
и субметодов с глобальным ApplyResults._job
из job_counter()
как key
.
ApplyResult
& амп; подклассы Pool
находятся либо в Pool._cache
, либо как возвращение Pool.apply_async()/._map_async()
& submethods.
- Разница между
Pool.map()
и Pool.map_async()
заключается в том, что Pool.map() == Pool.map_async().get()
заставляет/блокирует основной процесс для ожидания вычисления всех результатов и их сохранения в возвращаемом объекте ApplyResult()
.
Queue
/SimpleQueues in
Пул ':
Pool.taskqueue
: передает высокоуровневую работу Pool.apply_async()/.map_async()
/etc. преобразуется в пакеты задач из метода apply к Pool._task_handler
.
Pool._inqueue
: направляет работу как пакетный "итератор" от Pool._task_handler
до Pool._pool.Process(target=worker, ...)
Pool._outqueue
: направляет результаты из Pool._pool.Process(target=worker, ...)
(инициализированного Pool._worker_handler
) в Pool._result_handler
, который снова _set()
их в ApplyResult
кэширует в Pool._cache[self._job]
.
ApplyResult
будет хранить результаты в виде списка, если у цели func
есть возвращаемые объекты. В противном случае ApplyResult()
- это просто дескриптор для методов синхронизации, то есть методов вызова состояния результата.
- Для соединения процессов и потоков предлагается 4 класса от высокой до простой функциональности в следующем порядке:
queues.JoinableQueue
, queues.Queue
, SimpleQueue
, Pipe
/PipeConnection
.
Pipe
- это просто метод, возвращающий 2 фактических экземпляра класса PipeConnection
.
Некоторые примеры кода
import logging
import multiprocessing as mp
import random
import time
import numpy as np
from copy import deepcopy
MODEL_INPUTS = ["input_ids", "mc_token_ids", "lm_labels", "mc_labels", "token_type_ids"]
mp.log_to_stderr(level=logging.INFO) # mp.log_to_strerr(level=logging.DEBUG)
logger = mp.get_logger()
logger.setLevel(level=logging.INFO) # mp.setLevel(level=logging.DEBUG)
def secs2hms(seconds, num_decimals=4):
hms_time = [*(*divmod(divmod(int(seconds), 60)[0], 60), divmod(int(seconds), 60)[1])]
if hasattr(seconds, '__round__'):
hms_time[-1] += seconds.__round__(num_decimals) - int(seconds)
return hms_time
class Timer():
def __init__(self, time_name, log_method=print, time_format='hms', hms_decimals=4):
self.time_name = time_name
self.output_method = get_log_method(method_name=log_method_name)
self.time_format = time_format
self.hms_decimals = hms_decimals
self.start_time = time.time()
def start(self):
raise RuntimeError('Timer was already started at initialization.')
def stop(self, *args):
seconds_time = time.time() - self.start_time
time_name = self.time_name.format(*args)
if self.time_format == 'hms':
hms_time = secs2hms(seconds=seconds_time, num_decimals=self.hms_decimals)
hms_time = ' '.join([text.format(dt) for dt, text in zip(hms_time, ['{}h', '{}min', '{}sec']) if dt > 0])
self.output_method('{} = {}'.format(time_name, hms_time))
else:
self.output_method('{} = {}sec'.format(time_name, seconds_time))
self._delete_timer()
def _delete_timer(self):
del self
def get_log_method(method_name):
if method_name == 'debug':
log_method = logger.debug
elif method_name == 'info':
log_method = logger.info
else:
log_method = print
return log_method
def _generate_random_array(shape):
return np.array([[[random.randint(0, 1000)
for _ in range(shape[2])]
for _ in range(shape[1])]
for _ in range(shape[0])])
def random_piped_array(shape, pipe_in, log_method_name='print', log_name='RANDOM'):
log_method = get_log_method(method_name=log_method_name)
array = _generate_random_array(shape=shape)
log_method('{}: sending 'array through 'pipe_in''.format(log_name))
pipe_in.send(array)
def random_array(shape, log_method_name='print', log_name='RANDOM'):
log_method = get_log_method(method_name=log_method_name)
assert len(shape) == 3
array = _generate_random_array(shape=shape)
log_method('{}: append 'array' to 'shared_array''.format(log_name))
# for dataset_name in ['train', 'valid']:
# shared_arrays[dataset_name].append(array)
return array
def random_shared_array(shape, shared_arrays, log_method_name='print', log_name='SHARED_RANDOM'):
log_method = get_log_method(method_name=log_method_name)
assert len(shape) == 3
array = _generate_random_array(shape=shape)
log_method('{}: append 'array' to 'shared_array''.format(log_name))
shared_arrays.append(array)
def random_nested_array(shape, nested_shared_arrays, dataset_name, log_method_name='print', log_name='NESTED_RANDOM'):
log_method = get_log_method(method_name=log_method_name)
log_method('{}: appending array to shared_arrays[\'{}\']'.format(log_name, dataset_name))
assert len(shape) == 3
array = _generate_random_array(shape=shape)
log_method('{}: appendind 'array' to 'shared_array' with currently len(nested_shared_array[\'{}\']) = {}'.format(
log_name, dataset_name, len(nested_shared_arrays[dataset_name])))
nested_shared_arrays[dataset_name].append(array)
def nested_dict_list_deepcopy(nested_shared_arrays):
"""No hierachical switching between mp.manager.BaseProxy and unshared elements"""
nested_unshared_arrays = dict()
for key, shared_list in nested_shared_arrays.items():
nested_unshared_arrays[key] = deepcopy(shared_list)
return nested_unshared_arrays
def log_arrays_state(arrays, log_method_name='print', log_name='ARRAY_STATE'):
log_method = get_log_method(method_name=log_method_name)
log_method('ARRAY_STATE: type(arrays) = {}'.format(type(arrays)))
try:
if hasattr(arrays, '__len__'):
log_method('{}: len(arrays) = {}'.format(log_name, len(arrays)))
if len(arrays) < 20:
for idx, array in enumerate(arrays):
log_method('{}: type(arrays[{}]) = {}'.format(log_name, idx, type(array)))
if hasattr(array, 'shape'):
log_method('{}: arrays[{}].shape = {}'.format(log_name, idx, array.shape))
else:
log_method('{}: arrays[{}] has not 'shape' attribute'.format(log_name, idx))
else:
log_method('{}: array has no '__len__' method'.format(log_name))
except BrokenPipeError as error_msg:
log_method('{}: BrokenPipeError: {}'.format(log_name, error_msg))
def log_nested_arrays_state(nested_arrays, log_method_name='print', log_name='NESTED_ARRAY_STATE'):
log_method = get_log_method(method_name=log_method_name)
log_method('{}: type(arrays) = {}'.format(log_name, type(nested_arrays)))
for key, arrays in nested_arrays.items():
log_arrays_state(arrays=arrays, log_name=log_name + '_' + key.upper(), log_method_name=log_method_name)
if __name__ == '__main__':
log_method = logger.info
# log_method cannot be pickled in map_async, therefore an extra log_method_name string is implemented to hand
# through
log_method_name = 'info'
num_samples = 100
num_processes = 1 # len(MODEL_INPUTS) #
array_shapes = [(num_samples, random.randint(2, 5), random.randint(100, 300)) for _ in range(len(MODEL_INPUTS))]
def stdout_some_newlines(num_lines=2, sleep_time=1):
print(''.join(num_lines * ['\n']))
time.sleep(sleep_time)
# Pool with results from 'func' with 'return' received from 'AsyncResult'(='ApplyResult')
# 'AsyncResult' also used for process synchronization, e.g. waiting for processes to finish
log_method('MAIN: setting up 'Pool.map_async' with 'return'ing 'func'')
async_return_timer = Timer(time_name='TIMER_POOL: time for random array with {} processes'.format(num_processes),
log_method=log_method)
# Pool with variable return
setup_pool_timer = Timer(time_name='TIMER_SETUP: time to set up pool with {} processes'.format(num_processes),
log_method=log_method)
with mp.Pool(processes=num_processes) as pool:
setup_pool_timer.stop()
arrays = pool.starmap_async(func=random_array, iterable=[(shape, log_method_name) for shape in array_shapes])
getted_arrays = arrays.get()
async_return_timer.stop()
# Logging array state inside the 'pool' context manager
log_method('MAIN: arrays from 'pool.map_async() return' with in the 'pool'\ context manager:')
log_arrays_state(arrays=arrays, log_method_name=log_method_name)
log_method('MAIN: arrays.get() from 'pool.map_async() return' with in the 'pool'\ context manager:')
log_arrays_state(arrays=getted_arrays, log_method_name=log_method_name)
# Logging array state outside the 'pool' context manager
log_method('MAIN: arrays from 'pool.map_async() return' outside the 'pool'\ context manager:')
log_arrays_state(arrays=arrays, log_method_name=log_method_name)
log_method('MAIN: arrays.get() from 'pool.map_async() return' outside the 'pool'\ context manager:')
log_arrays_state(arrays=getted_arrays, log_method_name=log_method_name)
del pool, arrays, getted_arrays
stdout_some_newlines()
# Functionality of 'np.Process().is_alive()
log_method('IS_ALIVE: testing funcktionality of flag 'mp.Process().is_alive()' w.r.t. process status')
p = mp.Process(target=lambda x: x ** 2, args=(10,))
log_method('IS_ALIVE: after intializing, before starting: {}'.format(p.is_alive()))
p.start()
log_method('IS_ALIVE: after starting, before joining: p.is_alive() = {}'.format(p.is_alive()))
time.sleep(5)
log_method('IS_ALIVE: after sleeping 5sec, before joining: p.is_alive() = {}'.format(p.is_alive()))
p.join()
log_method('IS_ALIVE: after joining: p.is_alive() = {}'.format(p.is_alive()))
p.terminate()
del p
stdout_some_newlines()
# Pool with 'func' 'return'ing results directly to the reuslt handler from 'mp.Pool().starmap_async()' of type
# 'AsyncResults()'
log_method(
'MAIN: Pool.map() is not tested explicitly because is equivalent to 'Pool.map() == Pool.map_async().get()')
stdout_some_newlines()
# Pool with results assigned to shared variable & 'AsyncResult' only used for process synchronization but
# not for result receiving
log_method(
'MAIN: setting up Manager(), Manager.list() as shared variable and Pool.starmap_async with results from shared '
'variable')
async_shared_timer = Timer(
time_name='TIMER_POOL_SHARED: time for random array with {} processes'.format(num_processes),
log_method=log_method)
setup_shared_variable_timer = Timer(time_name='TIMEE_INIT: time to set up shared variable', log_method=log_method)
with mp.Manager() as sync_manager:
shared_arrays = sync_manager.list()
setup_shared_variable_timer.stop()
async_return_timer = Timer(
time_name='TIMER_POOL: time for random array with {} processes'.format(num_processes),
log_method=log_method)
setup_pool_timer = Timer(
time_name='TIMER_POOL_INIT: time to set up pool with {} processes'.format(num_processes),
log_method=log_method)
with mp.Pool(processes=num_processes) as pool:
setup_pool_timer.stop()
async_result = pool.starmap_async(
func=random_shared_array,
iterable=[(shape, shared_arrays, log_method_name) for shape in array_shapes])
log_method('MAIN: async_result.ready() befor async.wait() = {}'.format(async_result.ready()))
async_result.wait()
log_method('MAIN: async_result.ready() after async.wait() = {}'.format(async_result.ready()))
log_method('MAIN: asyn_result.sucessful() after async.wait() = {}'.format(async_result.successful()))
async_return_timer.stop()
copy_timer = Timer('TIMER_COPY: time to copy shared_arrays to standard arrays', log_method=log_method)
unshared_arrays = deepcopy(shared_arrays)
copy_timer.stop()
async_shared_timer.stop()
log_method('MAIN: shared_arrays from 'pool.map_async()' within 'sync_manager' context manager:')
log_arrays_state(arrays=shared_arrays, log_method_name=log_method_name)
log_method(
'MAIN: unshared_arrays = deepcopy(shared_arrays) from 'pool.map_async()' within 'sync_manager'\ '
'context manager:')
log_arrays_state(arrays=unshared_arrays, log_method_name=log_method_name)
log_method('MAIN: shared_arrays from 'pool.map_async()' outside 'sync_manager'\ context manager:')
log_arrays_state(arrays=shared_arrays, log_method_name=log_method_name)
log_method('MAIN: unshared_arrays from 'pool.map_async()' outside 'sync_manager'\ context manager:')
log_arrays_state(arrays=unshared_arrays, log_method_name=log_method_name)
del sync_manager, shared_arrays, async_result, pool, unshared_arrays
stdout_some_newlines()
# Same as above just with pipe instead of 'shared_arrays'
log_method('MAIN: separate process outputting to 'mp.Pipe()'')
process_pipe_timer = Timer(time_name='TIMER_PIPE: time for 'random_pipe_array' outputting through a 'mp.Pipe()')
arrays = list()
pipe_in, pipe_out = mp.Pipe()
# initialize processes
processes = [mp.Process(target=random_piped_array, args=(shape, pipe_in, log_method_name)) for shape in
array_shapes]
# Start processes
for process in processes:
process.start()
# Collect piped arrays form pipe and append them to 'arrays'
while (any([process.is_alive() for process in processes]) or pipe_out.poll()) and len(arrays) < len(MODEL_INPUTS):
log_method(
'RANDOM: receiving arrays through pipe and appending to arrays with currently len(arrays) = {}'.format(
len(arrays)))
arrays.append(pipe_out.recv())
# join processes
for process in processes:
process.join()
process_pipe_timer.stop()
log_arrays_state(arrays=arrays, log_method_name=log_method_name)
pipe_in.close()
pipe_out.close()
del arrays, pipe_in, pipe_out, processes, process
stdout_some_newlines()
# Nested shared dict/list/arrays
log_method('MAIN: 'random_nested_arrays' with nested shared 'mp.Manager().dict()' and 'mp.Manager().list()'s')
nested_timer = Timer(time_name='TIMER_NESTED: time for 'random_nested_arrays()'')
with mp.Manager() as sync_manager:
nested_shared_arrays = sync_manager.dict()
nested_shared_arrays['train'] = sync_manager.list()
nested_shared_arrays['valid'] = sync_manager.list()
with mp.Pool(processes=num_processes) as pool:
nested_results = pool.starmap_async(func=random_nested_array,
iterable=[(shape, nested_shared_arrays, dataset_name, log_method_name)
for dataset_name in nested_shared_arrays.keys()
for shape in array_shapes])
nested_results.wait()
unshared_nested_arrays = nested_dict_list_deepcopy(nested_shared_arrays)
nested_timer.stop()
log_nested_arrays_state(nested_arrays=unshared_nested_arrays, log_method_name=log_method_name)
del sync_manager, nested_shared_arrays, pool, nested_results, unshared_nested_arrays
stdout_some_newlines()
# List of processes targeted directly to their functions one by one
log_method(
'MAIN: separate process outputting to shared 'mp.Manager.list()' with process handles maintained in list()')
log_method('MAIN: separate process implementations are only preferred over pools for 1-to-1=processes-to-tasks'
' relations or asynchronous single tasks calculations.')
processes_timer = Timer(
time_name='TIMER_PROCESS: time for 'random_shared_arrays' with separate {} processes'.format(num_processes),
log_method=log_method)
with mp.Manager() as sync_manager:
shared_arrays = sync_manager.list()
# Initialize processes
processes = [mp.Process(target=random_shared_array, args=(shape, shared_arrays, log_method_name))
for shape in array_shapes]
# Start processes
for process in processes:
process.start()
processes_timer.stop()
# Join processes = wait for processes to finish
for process in processes:
process.join()
unshared_process_arrays = deepcopy(shared_arrays)
processes_timer.stop()
log_arrays_state(arrays=unshared_process_arrays, log_method_name=log_method_name)
del sync_manager, shared_arrays, unshared_process_arrays, processes, process
stdout_some_newlines()