Ответ 1
Во-первых, это действительно большой вопрос! Покопавшись немного в коде multiprocessing
, я думаю, что нашел способ сделать это:
При запуске multiprocessing.Pool
внутри объекта Pool
создается объект multiprocessing.Process
для каждого члена пула. Когда эти подпроцессы запускаются, они вызывают функцию _bootstrap
, которая выглядит следующим образом:
def _bootstrap(self):
from . import util
global _current_process
try:
# ... (stuff we don't care about)
util._finalizer_registry.clear()
util._run_after_forkers()
util.info('child process calling self.run()')
try:
self.run()
exitcode = 0
finally:
util._exit_function()
# ... (more stuff we don't care about)
Метод run
- это то, что на самом деле запускает target
, который вы дали объекту Process
. Для процесса Pool
следует, что метод с длительным циклом while, который ожидает, что рабочие элементы попадут во внутреннюю очередь. Для нас действительно интересно то, что произошло после self.run
: util._exit_function()
.
Как выясняется, эта функция выполняет некоторую очистку, которая очень похожа на то, что вы ищете:
def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
active_children=active_children,
current_process=current_process):
# NB: we hold on to references to functions in the arglist due to the
# situation described below, where this function is called after this
# module globals are destroyed.
global _exiting
info('process shutting down')
debug('running all "atexit" finalizers with priority >= 0') # Very interesting!
_run_finalizers(0)
Здесь docstring _run_finalizers
:
def _run_finalizers(minpriority=None):
'''
Run all finalizers whose exit priority is not None and at least minpriority
Finalizers with highest priority are called first; finalizers with
the same priority will be called in reverse order of creation.
'''
Метод фактически проходит через список обратных вызовов финализатора и выполняет их:
items = [x for x in _finalizer_registry.items() if f(x)]
items.sort(reverse=True)
for key, finalizer in items:
sub_debug('calling %s', finalizer)
try:
finalizer()
except Exception:
import traceback
traceback.print_exc()
Perfect. Итак, как мы попадаем в _finalizer_registry
? Там недокументированный объект с именем Finalize
в multiprocessing.util
, который отвечает за добавление обратного вызова в реестр:
class Finalize(object):
'''
Class which supports object finalization using weakrefs
'''
def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
assert exitpriority is None or type(exitpriority) is int
if obj is not None:
self._weakref = weakref.ref(obj, self)
else:
assert exitpriority is not None
self._callback = callback
self._args = args
self._kwargs = kwargs or {}
self._key = (exitpriority, _finalizer_counter.next())
self._pid = os.getpid()
_finalizer_registry[self._key] = self # That what we're looking for!
Итак, положив все это в пример:
import multiprocessing
from multiprocessing.util import Finalize
resource_cm = None
resource = None
class Resource(object):
def __init__(self, args):
self.args = args
def __enter__(self):
print("in __enter__ of %s" % multiprocessing.current_process())
return self
def __exit__(self, *args, **kwargs):
print("in __exit__ of %s" % multiprocessing.current_process())
def open_resource(args):
return Resource(args)
def _worker_init(args):
global resource
print("calling init")
resource_cm = open_resource(args)
resource = resource_cm.__enter__()
# Register a finalizer
Finalize(resource, resource.__exit__, exitpriority=16)
def hi(*args):
print("we're in the worker")
if __name__ == "__main__":
pool = multiprocessing.Pool(initializer=_worker_init, initargs=("abc",))
pool.map(hi, range(pool._processes))
pool.close()
pool.join()
Вывод:
calling init
in __enter__ of <Process(PoolWorker-1, started daemon)>
calling init
calling init
in __enter__ of <Process(PoolWorker-2, started daemon)>
in __enter__ of <Process(PoolWorker-3, started daemon)>
calling init
in __enter__ of <Process(PoolWorker-4, started daemon)>
we're in the worker
we're in the worker
we're in the worker
we're in the worker
in __exit__ of <Process(PoolWorker-1, started daemon)>
in __exit__ of <Process(PoolWorker-2, started daemon)>
in __exit__ of <Process(PoolWorker-3, started daemon)>
in __exit__ of <Process(PoolWorker-4, started daemon)>
Как вы можете видеть, __exit__
вызывается во всех наших рабочих, когда мы join()
пул.