Контекстные менеджеры и многопроцессорные пулы

Предположим, что вы используете объект multiprocessing.Pool, и вы используете параметр initializer конструктора для передачи функции инициализации, которая затем создает ресурс в глобальном пространстве имен. Предположим, что у ресурса есть менеджер контекста. Как бы вы справлялись с жизненным циклом ресурса, управляемого контекстом, при условии, что ему нужно прожить жизнь в процессе, но правильно ли очиститься в конце?

До сих пор у меня есть что-то вроде этого:

resource_cm = None
resource = None


def _worker_init(args):
    global resource
    resource_cm = open_resource(args)
    resource = resource_cm.__enter__()

С этого момента процессы пула могут использовать ресурс. Все идет нормально. Но обработка очистки немного сложнее, так как класс multiprocessing.Pool не предоставляет аргумент destructor или deinitializer.

Одна из моих идей - использовать модуль atexit и зарегистрировать очистку в инициализаторе. Что-то вроде этого:

def _worker_init(args):
    global resource
    resource_cm = open_resource(args)
    resource = resource_cm.__enter__()

    def _clean_up():
        resource_cm.__exit__()

    import atexit
    atexit.register(_clean_up)

Это хороший подход? Есть ли более простой способ сделать это?

EDIT: atexit, похоже, не работает. По крайней мере, не так, как я использую его выше, поэтому прямо сейчас у меня все еще нет решения этой проблемы.

Ответы

Ответ 1

Во-первых, это действительно большой вопрос! Покопавшись немного в коде multiprocessing, я думаю, что нашел способ сделать это:

При запуске multiprocessing.Pool внутри объекта Pool создается объект multiprocessing.Process для каждого члена пула. Когда эти подпроцессы запускаются, они вызывают функцию _bootstrap, которая выглядит следующим образом:

def _bootstrap(self):
    from . import util
    global _current_process
    try:
        # ... (stuff we don't care about)
        util._finalizer_registry.clear()
        util._run_after_forkers()
        util.info('child process calling self.run()')
        try:
            self.run()
            exitcode = 0 
        finally:
            util._exit_function()
        # ... (more stuff we don't care about)

Метод run - это то, что на самом деле запускает target, который вы дали объекту Process. Для процесса Pool следует, что метод с длительным циклом while, который ожидает, что рабочие элементы попадут во внутреннюю очередь. Для нас действительно интересно то, что произошло после self.run: util._exit_function().

Как выясняется, эта функция выполняет некоторую очистку, которая очень похожа на то, что вы ищете:

def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
                   active_children=active_children,
                   current_process=current_process):
    # NB: we hold on to references to functions in the arglist due to the
    # situation described below, where this function is called after this
    # module globals are destroyed.

    global _exiting

    info('process shutting down')
    debug('running all "atexit" finalizers with priority >= 0')  # Very interesting!
    _run_finalizers(0)

Здесь docstring _run_finalizers:

def _run_finalizers(minpriority=None):
    '''
    Run all finalizers whose exit priority is not None and at least minpriority

    Finalizers with highest priority are called first; finalizers with
    the same priority will be called in reverse order of creation.
    '''

Метод фактически проходит через список обратных вызовов финализатора и выполняет их:

items = [x for x in _finalizer_registry.items() if f(x)]
items.sort(reverse=True)

for key, finalizer in items:
    sub_debug('calling %s', finalizer)
    try:
        finalizer()
    except Exception:
        import traceback
        traceback.print_exc()

Perfect. Итак, как мы попадаем в _finalizer_registry? Там недокументированный объект с именем Finalize в multiprocessing.util, который отвечает за добавление обратного вызова в реестр:

class Finalize(object):
    '''
    Class which supports object finalization using weakrefs
    '''
    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
        assert exitpriority is None or type(exitpriority) is int

        if obj is not None:
            self._weakref = weakref.ref(obj, self)
        else:
            assert exitpriority is not None

        self._callback = callback
        self._args = args
        self._kwargs = kwargs or {}
        self._key = (exitpriority, _finalizer_counter.next())
        self._pid = os.getpid()

        _finalizer_registry[self._key] = self  # That what we're looking for!

Итак, положив все это в пример:

import multiprocessing
from multiprocessing.util import Finalize

resource_cm = None
resource = None

class Resource(object):
    def __init__(self, args):
        self.args = args

    def __enter__(self):
        print("in __enter__ of %s" % multiprocessing.current_process())
        return self

    def __exit__(self, *args, **kwargs):
        print("in __exit__ of %s" % multiprocessing.current_process())

def open_resource(args):
    return Resource(args)

def _worker_init(args):
    global resource
    print("calling init")
    resource_cm = open_resource(args)
    resource = resource_cm.__enter__()
    # Register a finalizer
    Finalize(resource, resource.__exit__, exitpriority=16)

def hi(*args):
    print("we're in the worker")

if __name__ == "__main__":
    pool = multiprocessing.Pool(initializer=_worker_init, initargs=("abc",))
    pool.map(hi, range(pool._processes))
    pool.close()
    pool.join()

Вывод:

calling init
in __enter__ of <Process(PoolWorker-1, started daemon)>
calling init
calling init
in __enter__ of <Process(PoolWorker-2, started daemon)>
in __enter__ of <Process(PoolWorker-3, started daemon)>
calling init
in __enter__ of <Process(PoolWorker-4, started daemon)>
we're in the worker
we're in the worker
we're in the worker
we're in the worker
in __exit__ of <Process(PoolWorker-1, started daemon)>
in __exit__ of <Process(PoolWorker-2, started daemon)>
in __exit__ of <Process(PoolWorker-3, started daemon)>
in __exit__ of <Process(PoolWorker-4, started daemon)>

Как вы можете видеть, __exit__ вызывается во всех наших рабочих, когда мы join() пул.