Python: получение трассировки из многопроцессорной обработки.
Я пытаюсь получить объект трассировки из многопроцессорного процесса.
К сожалению, передача информации об исключении через трубу не работает, потому что объекты трассировки не могут быть маринованными:
def foo(pipe_to_parent):
try:
raise Exception('xxx')
except:
pipe_to_parent.send(sys.exc_info())
to_child, to_self = multiprocessing.Pipe()
process = multiprocessing.Process(target = foo, args = (to_self,))
process.start()
exc_info = to_child.recv()
process.join()
print traceback.format_exception(*exc_info)
to_child.close()
to_self.close()
Traceback:
Traceback (most recent call last):
File "/usr/lib/python2.6/multiprocessing/process.py", line 231, in _bootstrap
self.run()
File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run
self._target(*self._args, **self._kwargs)
File "foo", line 7, in foo
to_parent.send(sys.exc_info())
PicklingError: Can't pickle <type 'traceback'>: attribute lookup __builtin__.traceback failed
Есть ли другой способ доступа к информации об исключении? Я хотел бы избежать передачи форматированной строки.
Ответы
Ответ 1
Используя tblib
, вы можете передавать завернутые исключения и повторно вызывать их позже:
import tblib.pickling_support
tblib.pickling_support.install()
from multiprocessing import Pool
import sys
class ExceptionWrapper(object):
def __init__(self, ee):
self.ee = ee
__, __, self.tb = sys.exc_info()
def re_raise(self):
raise self.ee.with_traceback(self.tb)
# for Python 2 replace the previous line by:
# raise self.ee, None, self.tb
# example how to use ExceptionWrapper
def inverse(i):
"""will fail for i == 0"""
try:
return 1.0 / i
except Exception as e:
return ExceptionWrapper(e)
def main():
p = Pool(1)
results = p.map(inverse, [0, 1, 2, 3])
for result in results:
if isinstance(result, ExceptionWrapper):
result.re_raise()
if __name__ == "__main__":
main()
Итак, если вы поймали исключение в своем удаленном процессе, оберните его с помощью ExceptionWrapper
и затем передайте обратно. Вызов re_reraise
в основном процессе сделает всю работу.
Ответ 2
Так как multiprocessing
выводит содержимое строки исключений, созданных в дочерних процессах, вы можете обернуть весь свой код дочернего процесса в try-except, за исключением того, что он ловит любые исключения, форматирует relavent трассировки стека и создает новый Exception
который содержит всю соответствующую информацию в своей строке:
Пример функции, которую я использую с multiprocessing.map
:
def run_functor(functor):
"""
Given a no-argument functor, run it and return its result. We can
use this with multiprocessing.map and map it over a list of job
functors to do them.
Handles getting more than multiprocessing pitiful exception output
"""
try:
# This is where you do your actual work
return functor()
except:
# Put all exception text into an exception and raise that
raise Exception("".join(traceback.format_exception(*sys.exc_info())))
То, что вы получаете, это трассировка стека с другой отформатированной трассировкой стека как сообщение об ошибке, которое помогает при отладке.
Ответ 3
Кажется, трудно сделать picklable объектом трассировки.
Но вы можете отправлять только 2 первых элемента sys.exc_info()
и предварительно сформированную информацию трассировки с помощью метода traceback.extract_tb:
import multiprocessing
import sys
import traceback
def foo(pipe_to_parent):
try:
raise Exception('xxx')
except:
except_type, except_class, tb = sys.exc_info()
pipe_to_parent.send((except_type, except_class, traceback.extract_tb(tb)))
to_child, to_self = multiprocessing.Pipe()
process = multiprocessing.Process(target = foo, args = (to_self,))
process.start()
exc_info = to_child.recv()
process.join()
print exc_info
to_child.close()
to_self.close()
которые дают вам:
(<type 'exceptions.Exception'>, Exception('xxx',), [('test_tb.py', 7, 'foo', "raise Exception('xxx')")])
И тогда вы сможете получить больше информации об исключении причины (имя файла, номер строки, где исключение поднято, имя метода и оператор, который вызывает исключение)
Ответ 4
Python 3
В Python 3 теперь метод get
в multiprocessing.pool.Async
возвращает полный возврат, см. http://bugs.python.org/issue13831.
Python 2
Используйте traceback.format_exc
(что означает форматированное исключение), чтобы получить строку трассировки.
Сделать декоратор было бы намного удобнее, как показано ниже.
def full_traceback(func):
import traceback, functools
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
msg = "{}\n\nOriginal {}".format(e, traceback.format_exc())
raise type(e)(msg)
return wrapper
Пример:
def func0():
raise NameError("func0 exception")
def func1():
return func0()
# Key is here!
@full_traceback
def main(i):
return func1()
if __name__ == '__main__':
from multiprocessing import Pool
pool = Pool(4)
try:
results = pool.map_async(main, range(5)).get(1e5)
finally:
pool.close()
pool.join()
Обратная связь с декоратором:
Traceback (most recent call last):
File "bt.py", line 34, in <module>
results = pool.map_async(main, range(5)).get(1e5)
File "/opt/anaconda/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
NameError: Exception in func0
Original Traceback (most recent call last):
File "bt.py", line 13, in wrapper
return func(*args, **kwargs)
File "bt.py", line 27, in main
return func1()
File "bt.py", line 23, in func1
return func0()
File "bt.py", line 20, in func0
raise NameError("Exception in func0")
NameError: Exception in func0
Трассировка без декоратора:
Traceback (most recent call last):
File "bt.py", line 34, in <module>
results = pool.map_async(main, range(5)).get(1e5)
File "/opt/anaconda/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
NameError: Exception in func0
Ответ 5
Это вариация этого отличного ответа. Оба пользователя полагаются на tblib для сохранения трассировки.
Однако вместо того, чтобы возвращать объект исключения (по запросу OP), функция worker
может быть оставлена как-есть и просто завернута в try
/except
для хранения исключений для re -raise.
import tblib.pickling_support
tblib.pickling_support.install()
import sys
class DelayedException(Exception):
def __init__(self, ee):
self.ee = ee
__, __, self.tb = sys.exc_info()
super(DelayedException, self).__init__(str(ee))
def re_raise(self):
raise self.ee, None, self.tb
Пример
def worker():
try:
raise ValueError('Something went wrong.')
except Exception as e:
raise DelayedException(e)
if __name__ == '__main__':
import multiprocessing
pool = multiprocessing.Pool()
try:
pool.imap(worker, [1, 2, 3])
except DelayedException as e:
e.re_raise()
Ответ 6
Те же решения, что и @Syrtis Major и @interfect, но протестированные с Python 3.6:
import sys
import traceback
import functools
def catch_remote_exceptions(wrapped_function):
""" https://stackoverflow.com/info/6126007/python-getting-a-traceback """
@functools.wraps(wrapped_function)
def new_function(*args, **kwargs):
try:
return wrapped_function(*args, **kwargs)
except:
raise Exception( "".join(traceback.format_exception(*sys.exc_info())) )
return new_function
Использование:
class ProcessLocker(object):
@catch_remote_exceptions
def __init__(self):
super().__init__()
@catch_remote_exceptions
def create_process_locks(self, total_processes):
self.process_locks = []
# ...