Исключение, вызванное многопроцессорностью Пул не обнаружен
Похоже, что когда возникает исключение из процесса multiprocessing.Pool, нет трассировки стека или каких-либо других признаков того, что она потерпела неудачу. Пример:
from multiprocessing import Pool
def go():
print(1)
raise Exception()
print(2)
p = Pool()
p.apply_async(go)
p.close()
p.join()
печатает 1 и останавливается молча. Интересно, что вместо этого работает BaseException. Есть ли способ сделать поведение для всех исключений таким же, как BaseException?
Ответы
Ответ 1
У меня есть разумное решение проблемы, по крайней мере для целей отладки. В настоящее время у меня нет решения, которое восстановят исключение в основных процессах. Моя первая мысль заключалась в том, чтобы использовать декоратор, но вы можете только раскрыть функции, определенные на верхнем уровне модуля, так что прямо.
Вместо этого простой класс упаковки и подкласс Pool, который использует это для apply_async
(и, следовательно, apply
). Я оставлю map_async
как упражнение для читателя.
import traceback
from multiprocessing.pool import Pool
import multiprocessing
# Shortcut to multiprocessing logger
def error(msg, *args):
return multiprocessing.get_logger().error(msg, *args)
class LogExceptions(object):
def __init__(self, callable):
self.__callable = callable
def __call__(self, *args, **kwargs):
try:
result = self.__callable(*args, **kwargs)
except Exception as e:
# Here we add some debugging help. If multiprocessing's
# debugging is on, it will arrange to log the traceback
error(traceback.format_exc())
# Re-raise the original exception so the Pool worker can
# clean up
raise
# It was fine, give a normal answer
return result
class LoggingPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)
def go():
print(1)
raise Exception()
print(2)
multiprocessing.log_to_stderr()
p = LoggingPool(processes=1)
p.apply_async(go)
p.close()
p.join()
Это дает мне:
1
[ERROR/PoolWorker-1] Traceback (most recent call last):
File "mpdebug.py", line 24, in __call__
result = self.__callable(*args, **kwargs)
File "mpdebug.py", line 44, in go
raise Exception()
Exception
Ответ 2
Возможно, я что-то упустил, но не так ли, что возвращает метод get
объекта Result? См. Пулы процессов.
класс multiprocessing.pool.AsyncResult
Класс результата, возвращаемого Pool.apply_async() и Pool.map_async(). get ([timeout])
Верните результат, когда он придет. Если тайм-аут не является ничем, и результат не приходит таймаут секунд, а затем многопроцессорность .TimeoutError поднят. Если пульт дистанционного управления вызов вызывает исключение, тогда это исключение будет ререйзироваться с помощью get().
Итак, слегка изменив ваш пример, можно сделать
from multiprocessing import Pool
def go():
print(1)
raise Exception("foobar")
print(2)
p = Pool()
x = p.apply_async(go)
x.get()
p.close()
p.join()
Что дает результат
1
Traceback (most recent call last):
File "rob.py", line 10, in <module>
x.get()
File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get
raise self._value
Exception: foobar
Это не вполне удовлетворительно, так как он не печатает трассировку, но лучше, чем ничего.
UPDATE: эта ошибка была исправлена в Python 3.4, любезно предоставлена Ричардом Оудкерком. См. Вопрос получить метод multiprocessing.pool.Async должен вернуть полную трассировку.
Ответ 3
Решение с большинством голосов на момент написания статьи имеет проблему:
from multiprocessing import Pool
def go():
print(1)
raise Exception("foobar")
print(2)
p = Pool()
x = p.apply_async(go)
x.get() ## waiting here for go() to complete...
p.close()
p.join()
Как отметил @dfrankow, он будет ждать x.get()
, что разрушает точку запуска задачи асинхронно. Таким образом, для повышения эффективности (особенно если ваша рабочая функция go
занимает много времени), я бы изменил ее на:
from multiprocessing import Pool
def go(x):
print(1)
# task_that_takes_a_long_time()
raise Exception("Can't go anywhere.")
print(2)
return x**2
p = Pool()
results = []
for x in range(1000):
results.append( p.apply_async(go, [x]) )
p.close()
for r in results:
r.get()
Преимущества: рабочая функция запускается асинхронно, поэтому, если, например, вы запускаете много задач на нескольких ядрах, это будет намного более эффективно, чем исходное решение.
Недостатки: если в рабочей функции есть исключение, она будет только поднята после, пул выполнит все задачи. Это может быть или не быть желательным. EDITED в соответствии с комментарием @colinfang, который исправил это.
Ответ 4
У меня были успешные протоколирующие исключения с этим декоратором:
import traceback, functools, multiprocessing
def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
try:
func(*args, **kwargs)
except:
print 'Exception in '+func.__name__
traceback.print_exc()
return wrapped_func
с кодом в вопросе, это
@trace_unhandled_exceptions
def go():
print(1)
raise Exception()
print(2)
p = multiprocessing.Pool(1)
p.apply_async(go)
p.close()
p.join()
Просто украсьте функцию, которую вы передаете в пул процессов. Ключом к этой работе является @functools.wraps(func)
, иначе многопроцессор выдает a PicklingError
.
приведенный выше код дает
1
Exception in go
Traceback (most recent call last):
File "<stdin>", line 5, in wrapped_func
File "<stdin>", line 4, in go
Exception
Ответ 5
Я создал модуль RemoteException.py, который показывает полную отслеживание исключения в процессе. Python2. Скачайте и добавьте это в свой код:
import RemoteException
@RemoteException.showError
def go():
raise Exception('Error!')
if __name__ == '__main__':
import multiprocessing
p = multiprocessing.Pool(processes = 1)
r = p.apply(go) # full traceback is shown here
Ответ 6
import logging
from multiprocessing import Pool
def proc_wrapper(func, *args, **kwargs):
"""Print exception because multiprocessing lib doesn't return them right."""
try:
return func(*args, **kwargs)
except Exception as e:
logging.exception(e)
raise
def go(x):
print x
raise Exception("foobar")
p = Pool()
p.apply_async(proc_wrapper, (go, 5))
p.join()
p.close()
Ответ 7
Я бы попытался использовать pdb:
import pdb
import sys
def handler(type, value, tb):
pdb.pm()
sys.excepthook = handler
Ответ 8
Поскольку вы использовали apply_sync
, я предполагаю, что в случае использования требуется выполнить некоторые задачи синхронизации. Использовать обратный вызов для обработки - это еще один вариант. Обратите внимание, что этот параметр доступен только для python3.2 и выше и недоступен на python2.7.
from multiprocessing import Pool
def callback(result):
print('success', result)
def callback_error(result):
print('error', result)
def go():
print(1)
raise Exception()
print(2)
p = Pool()
p.apply_async(go, callback=callback, error_callback=callback_error)
# You can do another things
p.close()
p.join()
Ответ 9
Поскольку для multiprocessing.Pool
уже имеются достойные ответы, я предоставил решение с использованием другого подхода для полноты.
Для python >= 3.2
следующее решение кажется самым простым:
from concurrent.futures import ProcessPoolExecutor, wait
def go():
print(1)
raise Exception()
print(2)
futures = []
with ProcessPoolExecutor() as p:
for i in range(10):
futures.append(p.submit(go))
results = [f.result() for f in futures]
Преимущества:
- очень маленький код
- вызывает исключение в основном процессе
- предоставляет трассировку стека
- нет внешних зависимостей
Для получения дополнительной информации об API, пожалуйста, проверьте: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor
Кроме того, если вы отправляете большое количество задач, и вы хотите, чтобы ваш основной процесс завершился с ошибкой, как только одна из ваших задач выйдет из строя, вы можете использовать следующий фрагмент:
from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed
import time
def go():
print(1)
time.sleep(0.3)
raise Exception()
print(2)
futures = []
with ProcessPoolExecutor(1) as p:
for i in range(10):
futures.append(p.submit(go))
for f in as_completed(futures):
if f.exception() is not None:
for f in futures:
f.cancel()
break
[f.result() for f in futures]
Все остальные ответы завершаются только после выполнения всех задач.