Ответ 1
В документах действительно есть подсказка:
обратный вызов должен завершиться сразу после иначе поток, который обрабатывает результаты, будет заблокирован.
Обратные вызовы обрабатываются в основном процессе, но они запускаются в отдельном потоке. Когда вы создаете Pool
, он фактически создает несколько Thread
объектов внутри:
class Pool(object):
Process = Process
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None):
self._setup_queues()
self._taskqueue = Queue.Queue()
self._cache = {}
... # stuff we don't care about
self._worker_handler = threading.Thread(
target=Pool._handle_workers,
args=(self, )
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
self._worker_handler.start()
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
args=(self._taskqueue, self._quick_put, self._outqueue,
self._pool, self._cache)
)
self._task_handler.daemon = True
self._task_handler._state = RUN
self._task_handler.start()
self._result_handler = threading.Thread(
target=Pool._handle_results,
args=(self._outqueue, self._quick_get, self._cache)
)
self._result_handler.daemon = True
self._result_handler._state = RUN
self._result_handler.start()
Интересная тема для нас - _result_handler
; мы скоро поймем.
Переключение передач на секунду, когда вы запускаете apply_async
, он создает объект ApplyResult
внутри, чтобы управлять получением результата от дочернего элемента:
def apply_async(self, func, args=(), kwds={}, callback=None):
assert self._state == RUN
result = ApplyResult(self._cache, callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
class ApplyResult(object):
def __init__(self, cache, callback):
self._cond = threading.Condition(threading.Lock())
self._job = job_counter.next()
self._cache = cache
self._ready = False
self._callback = callback
cache[self._job] = self
def _set(self, i, obj):
self._success, self._value = obj
if self._callback and self._success:
self._callback(self._value)
self._cond.acquire()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
del self._cache[self._job]
Как вы можете видеть, метод _set
- это тот, который завершает фактическое выполнение переданного callback
, если задача выполнена успешно. Также обратите внимание, что он добавляет себя к глобальному cache
dict в конце __init__
.
Теперь вернемся к объекту нити _result_handler
. Этот объект вызывает функцию _handle_results
, которая выглядит так:
while 1:
try:
task = get()
except (IOError, EOFError):
debug('result handler got EOFError/IOError -- exiting')
return
if thread._state:
assert thread._state == TERMINATE
debug('result handler found thread._state=TERMINATE')
break
if task is None:
debug('result handler got sentinel')
break
job, i, obj = task
try:
cache[job]._set(i, obj) # Here is _set (and therefore our callback) being called!
except KeyError:
pass
# More stuff
Это цикл, который просто извлекает результаты из дочерних элементов из очереди, находит запись для него в cache
и вызывает _set
, который выполняет наш обратный вызов. Он способен работать, даже если вы находитесь в цикле, потому что он не работает в основном потоке.