Общие сведения о многопроцессорности: управление общей памятью, блокировки и очереди в Python
Multiprocessing - это мощный инструмент в python, и я хочу его более глубоко понять.
Я хочу знать, когда использовать регулярные Locks и Queues и когда использовать многопроцессорный Manager, чтобы делиться ими среди всех процессов.
Я придумал следующие тестовые сценарии с четырьмя различными условиями для многопроцессорности:
-
Использование пула и НЕТ Диспетчер
-
Использование пула и менеджера
-
Использование отдельных процессов и НЕТ Менеджер
-
Использование отдельных процессов и диспетчера
Работа
Все условия выполняют функцию задания the_job
. the_job
состоит из некоторой печати, которая закреплена блокировкой. Более того, ввод функции просто помещается в очередь (чтобы проверить, можно ли ее восстановить из очереди). Этот ввод представляет собой просто индекс idx
из range(10)
, созданный в основном script, называемом start_scenario
(показан внизу).
def the_job(args):
"""The job for multiprocessing.
Prints some stuff secured by a lock and
finally puts the input into a queue.
"""
idx = args[0]
lock = args[1]
queue=args[2]
lock.acquire()
print 'I'
print 'was '
print 'here '
print '!!!!'
print '1111'
print 'einhundertelfzigelf\n'
who= ' By run %d \n' % idx
print who
lock.release()
queue.put(idx)
Успех условия определяется как прекрасно напоминающий ввод
из очереди, см. функцию read_queue
внизу.
Условия
Условие 1 и 2 довольно самоочевидны.
Условие 1 включает в себя создание блокировки и очереди и передачу их в пул процессов:
def scenario_1_pool_no_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITHOUT a Manager for the lock and queue.
FAILS!
"""
mypool = mp.Pool(ncores)
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
mypool.imap(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
(Вспомогательная функция make_iterator
приведена в нижней части этого сообщения.)
Условия 1 терпят неудачу с RuntimeError: Lock objects should only be shared between processes through inheritance
.
Условие 2 довольно похоже, но теперь блокировка и очередь находятся под наблюдением менеджера:
def scenario_2_pool_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITH a Manager for the lock and queue.
SUCCESSFUL!
"""
mypool = mp.Pool(ncores)
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
mypool.imap(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
В состоянии 3 новых процесса запускаются вручную, а блокировка и очередь создаются без менеджера:
def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITHOUT a Manager,
SUCCESSFUL!
"""
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
Условие 4 похоже, но опять же с использованием менеджера:
def scenario_4_single_processes_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITH a Manager,
SUCCESSFUL!
"""
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
В обоих условиях - 3 и 4 - я начинаю новый
процесс для каждой из 10 задач the_job
с большинством ncores процессов
работающих в одно и то же время. Это достигается со следующей вспомогательной функцией:
def do_job_single_processes(jobfunc, iterator, ncores):
"""Runs a job function by starting individual processes for every task.
At most `ncores` processes operate at the same time
:param jobfunc: Job to do
:param iterator:
Iterator over different parameter settings,
contains a lock and a queue
:param ncores:
Number of processes operating at the same time
"""
keep_running=True
process_dict = {} # Dict containing all subprocees
while len(process_dict)>0 or keep_running:
terminated_procs_pids = []
# First check if some processes did finish their job
for pid, proc in process_dict.iteritems():
# Remember the terminated processes
if not proc.is_alive():
terminated_procs_pids.append(pid)
# And delete these from the process dict
for terminated_proc in terminated_procs_pids:
process_dict.pop(terminated_proc)
# If we have less active processes than ncores and there is still
# a job to do, add another process
if len(process_dict) < ncores and keep_running:
try:
task = iterator.next()
proc = mp.Process(target=jobfunc,
args=(task,))
proc.start()
process_dict[proc.pid]=proc
except StopIteration:
# All tasks have been started
keep_running=False
time.sleep(0.1)
Результат
Только условие 1 терпит неудачу (RuntimeError: Lock objects should only be shared between processes through inheritance
), тогда как остальные 3 условия успешны. Я пытаюсь обвести голову вокруг этого результата.
Почему пул должен обмениваться блокировкой и очередью между всеми процессами, а отдельные процессы из условия 3 - нет?
Я знаю, что для условий пула (1 и 2) все данные из итераторов передаются через травление, тогда как в условиях одного процесса (3 и 4) все данные из итераторов передаются путем наследования от основного процесса (Я использую Linux).
Я думаю, до тех пор, пока память не будет изменена из дочернего процесса, к той же памяти, к которой применяется родительский процесс, обращается (копирование на запись). Но как только кто-то говорит lock.acquire()
, это должно быть изменено, а дочерние процессы используют разные блокировки, размещенные где-то еще в памяти, не так ли? Как один дочерний процесс знает, что брат активировал блокировку, которая не делится через менеджера?
Наконец, несколько связанный мой вопрос, сколько разных условий 3 и 4. У обоих есть отдельные процессы, но они различаются в использовании менеджера. Оба считается действительным кодом? Или следует избегать использования менеджера, если на самом деле нет необходимости в нем?
Полный Script
Для тех, кто просто хочет скопировать и вставить все для выполнения кода, вот полный script:
__author__ = 'Me and myself'
import multiprocessing as mp
import time
def the_job(args):
"""The job for multiprocessing.
Prints some stuff secured by a lock and
finally puts the input into a queue.
"""
idx = args[0]
lock = args[1]
queue=args[2]
lock.acquire()
print 'I'
print 'was '
print 'here '
print '!!!!'
print '1111'
print 'einhundertelfzigelf\n'
who= ' By run %d \n' % idx
print who
lock.release()
queue.put(idx)
def read_queue(queue):
"""Turns a qeue into a normal python list."""
results = []
while not queue.empty():
result = queue.get()
results.append(result)
return results
def make_iterator(args, lock, queue):
"""Makes an iterator over args and passes the lock an queue to each element."""
return ((arg, lock, queue) for arg in args)
def start_scenario(scenario_number = 1):
"""Starts one of four multiprocessing scenarios.
:param scenario_number: Index of scenario, 1 to 4
"""
args = range(10)
ncores = 3
if scenario_number==1:
result = scenario_1_pool_no_manager(the_job, args, ncores)
elif scenario_number==2:
result = scenario_2_pool_manager(the_job, args, ncores)
elif scenario_number==3:
result = scenario_3_single_processes_no_manager(the_job, args, ncores)
elif scenario_number==4:
result = scenario_4_single_processes_manager(the_job, args, ncores)
if result != args:
print 'Scenario %d fails: %s != %s' % (scenario_number, args, result)
else:
print 'Scenario %d successful!' % scenario_number
def scenario_1_pool_no_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITHOUT a Manager for the lock and queue.
FAILS!
"""
mypool = mp.Pool(ncores)
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
mypool.map(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
def scenario_2_pool_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITH a Manager for the lock and queue.
SUCCESSFUL!
"""
mypool = mp.Pool(ncores)
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
mypool.map(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITHOUT a Manager,
SUCCESSFUL!
"""
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
def scenario_4_single_processes_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITH a Manager,
SUCCESSFUL!
"""
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
def do_job_single_processes(jobfunc, iterator, ncores):
"""Runs a job function by starting individual processes for every task.
At most `ncores` processes operate at the same time
:param jobfunc: Job to do
:param iterator:
Iterator over different parameter settings,
contains a lock and a queue
:param ncores:
Number of processes operating at the same time
"""
keep_running=True
process_dict = {} # Dict containing all subprocees
while len(process_dict)>0 or keep_running:
terminated_procs_pids = []
# First check if some processes did finish their job
for pid, proc in process_dict.iteritems():
# Remember the terminated processes
if not proc.is_alive():
terminated_procs_pids.append(pid)
# And delete these from the process dict
for terminated_proc in terminated_procs_pids:
process_dict.pop(terminated_proc)
# If we have less active processes than ncores and there is still
# a job to do, add another process
if len(process_dict) < ncores and keep_running:
try:
task = iterator.next()
proc = mp.Process(target=jobfunc,
args=(task,))
proc.start()
process_dict[proc.pid]=proc
except StopIteration:
# All tasks have been started
keep_running=False
time.sleep(0.1)
def main():
"""Runs 1 out of 4 different multiprocessing scenarios"""
start_scenario(1)
if __name__ == '__main__':
main()
Ответы
Ответ 1
multiprocessing.Lock
реализуется с использованием объекта Semaphore, предоставляемого ОС. В Linux ребенок просто наследует дескриптор семафора от родителя через os.fork
. Это не копия семафора; он фактически наследует тот же дескриптор, который имеет родитель, так же, как и дескрипторы файла. Windows, с другой стороны, не поддерживает os.fork
, поэтому он должен разжечь Lock
. Это делается путем создания дублирующего дескриптора для Семафора Windows, используемого внутри объекта multiprocessing.Lock
, с использованием API DuplicateHandle
, который гласит:
Двойной дескриптор относится к тому же объекту, что и исходный дескриптор. Поэтому любые изменения объекта отражаются через оба ручки
API DuplicateHandle
позволяет вам передать права на дублированный дескриптор дочернему процессу, чтобы дочерний процесс действительно мог использовать его после его разблокировки. Создав дублированный дескриптор, принадлежащий дочернему элементу, вы можете эффективно "делить" объект блокировки.
Здесь объект семафора в multiprocessing/synchronize.py
class SemLock(object):
def __init__(self, kind, value, maxvalue):
sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
debug('created semlock with handle %s' % sl.handle)
self._make_methods()
if sys.platform != 'win32':
def _after_fork(obj):
obj._semlock._after_fork()
register_after_fork(self, _after_fork)
def _make_methods(self):
self.acquire = self._semlock.acquire
self.release = self._semlock.release
self.__enter__ = self._semlock.__enter__
self.__exit__ = self._semlock.__exit__
def __getstate__(self): # This is called when you try to pickle the `Lock`.
assert_spawning(self)
sl = self._semlock
return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
def __setstate__(self, state): # This is called when unpickling a `Lock`
self._semlock = _multiprocessing.SemLock._rebuild(*state)
debug('recreated blocker with handle %r' % state[0])
self._make_methods()
Обратите внимание на вызов assert_spawning
в __getstate__
, который вызывается при травлении объекта. Вот как это реализовано:
#
# Check that the current thread is spawning a child process
#
def assert_spawning(self):
if not Popen.thread_is_spawning():
raise RuntimeError(
'%s objects should only be shared between processes'
' through inheritance' % type(self).__name__
)
Эта функция гарантирует, что вы "наследуете" Lock
, вызывая thread_is_spawning
. В Linux этот метод просто возвращает False
:
@staticmethod
def thread_is_spawning():
return False
Это связано с тем, что Linux не нуждается в pickle для наследования Lock
, поэтому, если __getstate__
фактически вызывается в Linux, мы не должны наследовать. В Windows больше происходит:
def dump(obj, file, protocol=None):
ForkingPickler(file, protocol).dump(obj)
class Popen(object):
'''
Start a subprocess to run the code of a process object
'''
_tls = thread._local()
def __init__(self, process_obj):
...
# send information to child
prep_data = get_preparation_data(process_obj._name)
to_child = os.fdopen(wfd, 'wb')
Popen._tls.process_handle = int(hp)
try:
dump(prep_data, to_child, HIGHEST_PROTOCOL)
dump(process_obj, to_child, HIGHEST_PROTOCOL)
finally:
del Popen._tls.process_handle
to_child.close()
@staticmethod
def thread_is_spawning():
return getattr(Popen._tls, 'process_handle', None) is not None
Здесь thread_is_spawning
возвращает True
, если объект Popen._tls
имеет атрибут process_handle
. Мы видим, что атрибут process_handle
создается в __init__
, тогда данные, которые мы хотим унаследовать, передаются от родителя к дочернему с помощью dump
, тогда атрибут удаляется. Таким образом, thread_is_spawning
будет только True
во время __init__
. Согласно этот поток списков рассылки python-идей, на самом деле это искусственное ограничение, добавленное для моделирования того же поведения, что и os.fork
в Linux. Windows фактически может поддерживать передачу Lock
в любое время, потому что DuplicateHandle
может быть запущен в любое время.
Все вышеизложенное относится к объекту Queue
, потому что он использует Lock
внутренне.
Я бы сказал, что наследование объектов Lock
предпочтительнее использовать Manager.Lock()
, потому что, когда вы используете Manager.Lock
, каждый отдельный вызов, который вы делаете в Lock
, должен быть отправлен через IPC в Manager
процесс, который будет намного медленнее, чем использование общей Lock
, которая живет внутри вызывающего процесса. Оба подхода являются вполне допустимыми.
Наконец, можно передать Lock
всем членам Pool
без использования Manager
, используя аргументы ключевого слова initializer
/initargs
:
lock = None
def initialize_lock(l):
global lock
lock = l
def scenario_1_pool_no_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITHOUT a Manager for the lock and queue.
"""
lock = mp.Lock()
mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,))
queue = mp.Queue()
iterator = make_iterator(args, queue)
mypool.imap(jobfunc, iterator) # Don't pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly.
mypool.close()
mypool.join()
return read_queue(queue)
Это работает, потому что аргументы, переданные в initargs
, передаются методу __init__
объектов Process
, которые выполняются внутри Pool
, поэтому они в конечном итоге наследуются, а не маринуются.