Python: multiprocessing.map: если один процесс вызывает исключение, почему не называются блоки finally, называемые другими процессами?
Мое понимание состоит в том, что, наконец, предложения должны всегда выполняться, если попытка была введена.
import random
from multiprocessing import Pool
from time import sleep
def Process(x):
try:
print x
sleep(random.random())
raise Exception('Exception: ' + x)
finally:
print 'Finally: ' + x
Pool(3).map(Process, ['1','2','3'])
Ожидаемый результат - это то, что для каждого из x, которое печатается самостоятельно по строке 8, должно быть возникновение "Наконец x".
Пример вывода:
$ python bug.py
1
2
3
Finally: 2
Traceback (most recent call last):
File "bug.py", line 14, in <module>
Pool(3).map(Process, ['1','2','3'])
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 225, in map
return self.map_async(func, iterable, chunksize).get()
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 522, in get
raise self._value
Exception: Exception: 2
Кажется, что исключение, завершающее один процесс, завершает родительский и родственный процессы, даже если в других процессах требуется дополнительная работа.
Почему я ошибаюсь? Почему это правильно? Если это правильно, как следует безопасно очищать ресурсы в многопроцессорном Python?
Ответы
Ответ 1
Короткий ответ: SIGTERM
trumps finally
.
Длинный ответ: включите ведение журнала с помощью mp.log_to_stderr()
:
import random
import multiprocessing as mp
import time
import logging
logger=mp.log_to_stderr(logging.DEBUG)
def Process(x):
try:
logger.info(x)
time.sleep(random.random())
raise Exception('Exception: ' + x)
finally:
logger.info('Finally: ' + x)
result=mp.Pool(3).map(Process, ['1','2','3'])
Выход журнала включает в себя:
[DEBUG/MainProcess] terminating workers
Что соответствует этому коду в multiprocessing.pool._terminate_pool
:
if pool and hasattr(pool[0], 'terminate'):
debug('terminating workers')
for p in pool:
p.terminate()
Каждый p
в pool
является multiprocessing.Process
, а вызов terminate
(по крайней мере, на компьютерах, отличных от Windows) вызывает SIGTERM:
из multiprocessing/forking.py
:
class Popen(object)
def terminate(self):
...
try:
os.kill(self.pid, signal.SIGTERM)
except OSError, e:
if self.wait(timeout=0.1) is None:
raise
Итак, дело сводится к тому, что происходит, когда процесс Python в пакете try
отправляется SIGTERM
.
Рассмотрим следующий пример (test.py):
import time
def worker():
try:
time.sleep(100)
finally:
print('enter finally')
time.sleep(2)
print('exit finally')
worker()
Если вы запустите его, отправьте его SIGTERM
, затем процесс немедленно завершится, без ввода набора finally
, о чем свидетельствует отсутствие вывода и отсутствие задержки.
В одном терминале:
% test.py
Во втором терминале:
% pkill -TERM -f "test.py"
Результат в первом терминале:
Terminated
Сравните это с тем, что происходит при отправке процесса SIGINT
(C-c
):
Во втором терминале:
% pkill -INT -f "test.py"
Результат в первом терминале:
enter finally
exit finally
Traceback (most recent call last):
File "/home/unutbu/pybin/test.py", line 14, in <module>
worker()
File "/home/unutbu/pybin/test.py", line 8, in worker
time.sleep(100)
KeyboardInterrupt
Вывод: SIGTERM
trumps finally
.
Ответ 2
Ответ от unutbu определенно объясняет, почему вы получаете наблюдаемое поведение. Однако следует подчеркнуть, что SIGTERM отправляется только из-за того, как реализуется multiprocessing.pool._terminate_pool
. Если вы можете избежать использования Pool
, вы можете получить желаемое поведение. Вот заимствованный пример:
from multiprocessing import Process
from time import sleep
import random
def f(x):
try:
sleep(random.random()*10)
raise Exception
except:
print "Caught exception in process:", x
# Make this last longer than the except clause in main.
sleep(3)
finally:
print "Cleaning up process:", x
if __name__ == '__main__':
processes = []
for i in range(4):
p = Process(target=f, args=(i,))
p.start()
processes.append(p)
try:
for process in processes:
process.join()
except:
print "Caught exception in main."
finally:
print "Cleaning up main."
После отправки SIGINT есть пример:
Caught exception in process: 0
^C
Cleaning up process: 0
Caught exception in main.
Cleaning up main.
Caught exception in process: 1
Caught exception in process: 2
Caught exception in process: 3
Cleaning up process: 1
Cleaning up process: 2
Cleaning up process: 3
Обратите внимание, что предложение finally
запускается для всех процессов. Если вам нужна общая память, подумайте об использовании Queue
, Pipe
, Manager
или какого-либо внешнего хранилища, например redis
или sqlite3
.
Ответ 3
finally
повторно вызывает исходное исключение если вы не return
от него. Исключение затем увеличивается на Pool.map
и убивает все ваше приложение. Подпроцессы завершаются, и вы не видите никаких других исключений.
Вы можете добавить return
для проглатывания исключения:
def Process(x):
try:
print x
sleep(random.random())
raise Exception('Exception: ' + x)
finally:
print 'Finally: ' + x
return
Тогда вы должны иметь None
в своем map
результате, когда возникло исключение.