Как подключить отладчик к подпроцессу python?
Мне нужно отладить дочерний процесс, порожденный multiprocessing.Process()
. Дегуатор pdb
, похоже, не знает о разветвлении и не может подключиться к уже запущенным процессам.
Есть ли более умные отладчики python, которые можно привязать к подпроцессу?
Ответы
Ответ 1
Winpdb - это в значительной степени определение более умного отладчика Python. Он явно поддерживает идущий вниз по вилке, не уверен, что он отлично работает с multiprocessing.Process()
, но стоит попробовать.
Список кандидатов для проверки вашего варианта использования см. в списке Python Debuggers в вики.
Ответ 2
Я искал простое решение этой проблемы и придумал следующее:
import sys
import pdb
class ForkedPdb(pdb.Pdb):
"""A Pdb subclass that may be used
from a forked multiprocessing child
"""
def interaction(self, *args, **kwargs):
_stdin = sys.stdin
try:
sys.stdin = open('/dev/stdin')
pdb.Pdb.interaction(self, *args, **kwargs)
finally:
sys.stdin = _stdin
Используйте его так же, как вы можете использовать классический Pdb:
ForkedPdb().set_trace()
Ответ 3
Это разработка ответа Romuald, который восстанавливает исходный stdin, используя его файловый дескриптор. Это приводит к тому, что readline работает внутри отладчика. Кроме того, специальное управление KeyboardInterrupt pdb отключено, чтобы не мешать многопроцессорному обработчику синтаксиса.
class ForkablePdb(pdb.Pdb):
_original_stdin_fd = sys.stdin.fileno()
_original_stdin = None
def __init__(self):
pdb.Pdb.__init__(self, nosigint=True)
def _cmdloop(self):
current_stdin = sys.stdin
try:
if not self._original_stdin:
self._original_stdin = os.fdopen(self._original_stdin_fd)
sys.stdin = self._original_stdin
self.cmdloop()
finally:
sys.stdin = current_stdin
Ответ 4
Основываясь на идее @memplex, мне пришлось изменить ее, чтобы она работала с joblib
, установив sys.stdin
в конструкторе, а также передав его напрямую через joblib.
import os
import pdb
import signal
import sys
import joblib
_original_stdin_fd = None
class ForkablePdb(pdb.Pdb):
_original_stdin = None
_original_pid = os.getpid()
def __init__(self):
pdb.Pdb.__init__(self)
if self._original_pid != os.getpid():
if _original_stdin_fd is None:
raise Exception("Must set ForkablePdb._original_stdin_fd to stdin fileno")
self.current_stdin = sys.stdin
if not self._original_stdin:
self._original_stdin = os.fdopen(_original_stdin_fd)
sys.stdin = self._original_stdin
def _cmdloop(self):
try:
self.cmdloop()
finally:
sys.stdin = self.current_stdin
def handle_pdb(sig, frame):
ForkablePdb().set_trace(frame)
def test(i, fileno):
global _original_stdin_fd
_original_stdin_fd = fileno
while True:
pass
if __name__ == '__main__':
print "PID: %d" % os.getpid()
signal.signal(signal.SIGUSR2, handle_pdb)
ForkablePdb().set_trace()
fileno = sys.stdin.fileno()
joblib.Parallel(n_jobs=2)(joblib.delayed(test)(i, fileno) for i in range(10))
Ответ 5
У меня была идея создать "фиктивные" классы, чтобы имитировать реализацию методов, которые вы используете из многопроцессорной обработки:
from multiprocessing import Pool
class DummyPool():
@staticmethod
def apply_async(func, args, kwds):
return DummyApplyResult(func(*args, **kwds))
def close(self): pass
def join(self): pass
class DummyApplyResult():
def __init__(self, result):
self.result = result
def get(self):
return self.result
def foo(a, b, switch):
# set trace when DummyPool is used
# import ipdb; ipdb.set_trace()
if switch:
return b - a
else:
return a - b
if __name__ == '__main__':
xml = etree.parse('C:/Users/anmendoza/Downloads/jim - 8.1/running-config.xml')
pool = DummyPool() # switch between Pool() and DummyPool() here
results = []
results.append(pool.apply_async(foo, args=(1, 100), kwds={'switch': True}))
pool.close()
pool.join()
results[0].get()
Ответ 6
Если вы находитесь на поддерживаемой платформе, попробуйте DTrace. Большинство семейств BSD/Solaris/OS X поддерживают DTrace.
Вот вступление автора. Вы можете использовать Dtrace для отладки практически всего.
Вот сообщение SO об обучении DTrace.