Python Multiprocessing: обработка ошибок для детей в родительском
В настоящее время я играю с многопроцессорностью и очередями.
Я написал фрагмент кода для экспорта данных из mongoDB, сопоставил его в реляционную (плоскую) структуру, преобразовал все значения в строку и вставлял их в mysql.
Каждый из этих шагов представляется в виде процесса и задает очереди импорта/экспорта, безопасные для экспорта mongoDB, которые обрабатываются в родительском.
Как вы увидите ниже, я использую очереди, а дочерние процессы завершаются, когда они читают "Нет" из очереди. Проблема, которую я имею в настоящее время, заключается в том, что если дочерний процесс запускается в необработанное Exception, это не распознается родителем, а остальные просто продолжают работать. То, что я хочу, это то, что весь shebang завершает работу и, в лучшем случае, ререйзирует ошибку ребенка.
У меня есть два вопроса:
- Как определить дочернюю ошибку в родительском?
- Как убить мои дочерние процессы после обнаружения ошибки (наилучшей практики)? Я понимаю, что поставить "Нет" в очередь, чтобы убить ребенка, довольно грязно.
Я использую python 2.7.
Вот основные части моего кода:
# Establish communication queues
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()
[...]
# create child processes
# all processes generated here are subclasses of "multiprocessing.Process"
# create mapper
mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)
for i in range(10)]
# create datatype converter, converts everything to str
converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)
for i in range(10)]
# create mysql writer
# I create a list of writers. currently only one,
# but I have the option to parallellize it further
writers = [mysql_inserter.MySqlWriter(mysql_host, mysql_user, mysql_passwd, mysql_schema, converter_result_q
, columns, 'w_'+mysql_table, 1000) for i in range(1)]
# starting mapper
for mapper in mappers:
mapper.start()
time.sleep(1)
# starting converter
for converter in converters:
converter.start()
# starting writer
for writer in writers:
writer.start()
[... инициализация соединения mongo db...]
# put each dataset read to queue for the mapper
for row in mongo_collection.find({inc_column: {"$gte": start}}):
mongo_input_result_q.put(row)
count += 1
if count % log_counter == 0:
print 'Mongo Reader' + " " + str(count)
print "MongoReader done"
# Processes are terminated when they read "None" object from queue
# now that reading is finished, put None for each mapper in the queue so they terminate themselves
# the same for all followup processes
for mapper in mappers:
mongo_input_result_q.put(None)
for mapper in mappers:
mapper.join()
for converter in converters:
mapper_result_q.put(None)
for converter in converters:
converter.join()
for writer in writers:
converter_result_q.put(None)
for writer in writers:
writer.join()
Ответы
Ответ 1
Я не знаю стандартной практики, но я обнаружил, что для надежной многопроцессорности я разрабатываю методы/класс/и т.д. специально для работы с многопроцессорной обработкой. В противном случае вы никогда не знаете, что происходит с другой стороны (если я не пропустил какой-то механизм для этого).
В частности, я делаю это:
- Подкласс
multiprocessing.Process
или создавать функции, которые специально поддерживают многопроцессорность (функции обертывания, которые вы не имеете контроля в случае необходимости)
- всегда обеспечивает общую ошибку
multiprocessing.Queue
от основного процесса до каждого рабочего процесса.
- заключить весь код запуска в
try: ... except Exception as e
. Затем, когда произойдет что-то неожиданное, отправьте пакет ошибок с:
- идентификатор процесса, который умер
- исключение с его исходным контекстом (здесь). Исходный контекст действительно важен, если вы хотите записывать полезную информацию в основной процесс.
- конечно, обрабатывать ожидаемые проблемы как обычно при нормальной работе рабочего
- (похоже на то, что вы сказали уже), предполагая длительный процесс, завершите текущий код (внутри try/catch-all) с помощью цикла
- определить токен остановки в классе или для функций.
- Когда основной процесс хочет, чтобы работник (ов) остановился, просто отправьте маркер остановки. чтобы остановить всех, отправить достаточно для всех процессов.
- цикл упаковки проверяет вход q для токена или любой другой вход, который вы хотите
Конечным результатом являются рабочие процессы, которые могут длиться долгое время, и это может дать вам знать, что происходит, когда что-то идет не так. Они будут умирать спокойно, так как вы сможете справиться с тем, что вам нужно сделать после исключения, и вы также узнаете, когда вам нужно перезапустить рабочего.
Опять же, я только что пришел к этому шаблону с помощью проб и ошибок, поэтому я не знаю, насколько это стандартизировано. Помогает ли это с тем, что вы просите?
Ответ 2
Почему бы не позволить процессу заботиться о своих собственных исключениях, например:
import multiprocessing as mp
import traceback
class Process(mp.Process):
def __init__(self, *args, **kwargs):
mp.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = mp.Pipe()
self._exception = None
def run(self):
try:
mp.Process.run(self)
self._cconn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._cconn.send((e, tb))
# raise e # You can still rise this exception if you need to
@property
def exception(self):
if self._pconn.poll():
self._exception = self._pconn.recv()
return self._exception
Теперь у вас есть как ошибка, так и трассировка в ваших руках:
def target():
raise ValueError('Something went wrong...')
p = Process(target = target)
p.start()
p.join()
if p.exception:
error, traceback = p.exception
print traceback
С уважением,
Marek
Ответ 3
Благодаря kobejohn я нашел решение, которое хорошо и стабильно.
-
Я создал подкласс multiprocessing.Process, который реализует некоторые функции и перезаписывает метод run()
для переноса нового метода saferun в блок try-catch. Для этого класса требуется инициализировать функцию обратной связи, которая используется для отправки информации, отладки, сообщений об ошибках родительской. Методы журнала в классе являются оболочками для глобально определенных функций журнала пакета:
class EtlStepProcess(multiprocessing.Process):
def __init__(self, feedback_queue):
multiprocessing.Process.__init__(self)
self.feedback_queue = feedback_queue
def log_info(self, message):
log_info(self.feedback_queue, message, self.name)
def log_debug(self, message):
log_debug(self.feedback_queue, message, self.name)
def log_error(self, err):
log_error(self.feedback_queue, err, self.name)
def saferun(self):
"""Method to be run in sub-process; can be overridden in sub-class"""
if self._target:
self._target(*self._args, **self._kwargs)
def run(self):
try:
self.saferun()
except Exception as e:
self.log_error(e)
raise e
return
-
Я подклассифицировал все мои другие шаги процесса из EtlStepProcess. Код, который должен быть запущен, реализуется в методе saferun(), а не выполняется. В этом случае мне не нужно добавлять вокруг него блок catch try, поскольку это уже выполняется методом run().
Пример:
class MySqlWriter(EtlStepProcess):
def __init__(self, mysql_host, mysql_user, mysql_passwd, mysql_schema, mysql_table, columns, commit_count,
input_queue, feedback_queue):
EtlStepProcess.__init__(self, feedback_queue)
self.mysql_host = mysql_host
self.mysql_user = mysql_user
self.mysql_passwd = mysql_passwd
self.mysql_schema = mysql_schema
self.mysql_table = mysql_table
self.columns = columns
self.commit_count = commit_count
self.input_queue = input_queue
def saferun(self):
self.log_info(self.name + " started")
#create mysql connection
engine = sqlalchemy.create_engine('mysql://' + self.mysql_user + ':' + self.mysql_passwd + '@' + self.mysql_host + '/' + self.mysql_schema)
meta = sqlalchemy.MetaData()
table = sqlalchemy.Table(self.mysql_table, meta, autoload=True, autoload_with=engine)
connection = engine.connect()
try:
self.log_info("start MySQL insert")
counter = 0
row_list = []
while True:
next_row = self.input_queue.get()
if isinstance(next_row, Terminator):
if counter % self.commit_count != 0:
connection.execute(table.insert(), row_list)
# Poison pill means we should exit
break
row_list.append(next_row)
counter += 1
if counter % self.commit_count == 0:
connection.execute(table.insert(), row_list)
del row_list[:]
self.log_debug(self.name + ' ' + str(counter))
finally:
connection.close()
return
-
В моем основном файле я отправляю процесс, который выполняет всю работу, и даёт ему feedback_queue. Этот процесс запускает все этапы, а затем считывает из mongoDB и помещает значения в начальную очередь. Мой основной процесс прослушивает очередь обратной связи и печатает все сообщения журнала. Если он получает журнал ошибок, он печатает ошибку и завершает ее дочерний элемент, который в свою очередь также прекращает все свои дочерние элементы перед смертью.
if __name__ == '__main__':
feedback_q = multiprocessing.Queue()
p = multiprocessing.Process(target=mongo_python_export, args=(feedback_q,))
p.start()
while p.is_alive():
fb = feedback_q.get()
if fb["type"] == "error":
p.terminate()
print "ERROR in " + fb["process"] + "\n"
for child in multiprocessing.active_children():
child.terminate()
else:
print datetime.datetime.fromtimestamp(fb["timestamp"]).strftime('%Y-%m-%d %H:%M:%S') + " " + \
fb["process"] + ": " + fb["message"]
p.join()
Я думаю о том, чтобы сделать модуль из него и поставить его на github, но сначала мне нужно сначала очистить и комментировать.
Ответ 4
Решение @mrkwjc solution простое, его легко понять и реализовать, но у этого решения есть один недостаток. Когда у нас мало процессов, и мы хотим остановить все процессы, если какой-либо один процесс имеет ошибку, нам нужно подождать, пока все процессы не будут завершены, чтобы проверить, p.exception
. Ниже приведен код, который устраняет эту проблему (т.е. когда один дочерний элемент имеет ошибку, мы прекращаем также другой дочерний элемент):
import multiprocessing
import traceback
from time import sleep
class Process(multiprocessing.Process):
"""
Class which returns child Exceptions to Parent.
/info/249845/python-multiprocessing-handling-child-errors-in-parent/1286169#1286169
"""
def __init__(self, *args, **kwargs):
multiprocessing.Process.__init__(self, *args, **kwargs)
self._parent_conn, self._child_conn = multiprocessing.Pipe()
self._exception = None
def run(self):
try:
multiprocessing.Process.run(self)
self._child_conn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._child_conn.send((e, tb))
# raise e # You can still rise this exception if you need to
@property
def exception(self):
if self._parent_conn.poll():
self._exception = self._parent_conn.recv()
return self._exception
class Task_1:
def do_something(self, queue):
queue.put(dict(users=2))
class Task_2:
def do_something(self, queue):
queue.put(dict(users=5))
def main():
try:
task_1 = Task_1()
task_2 = Task_2()
# Example of multiprocessing which is used:
# https://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/
task_1_queue = multiprocessing.Queue()
task_2_queue = multiprocessing.Queue()
task_1_process = Process(
target=task_1.do_something,
kwargs=dict(queue=task_1_queue))
task_2_process = Process(
target=task_2.do_something,
kwargs=dict(queue=task_2_queue))
task_1_process.start()
task_2_process.start()
while task_1_process.is_alive() or task_2_process.is_alive():
sleep(10)
if task_1_process.exception:
error, task_1_traceback = task_1_process.exception
# Do not wait until task_2 is finished
task_2_process.terminate()
raise ChildProcessError(task_1_traceback)
if task_2_process.exception:
error, task_2_traceback = task_2_process.exception
# Do not wait until task_1 is finished
task_1_process.terminate()
raise ChildProcessError(task_2_traceback)
task_1_process.join()
task_2_process.join()
task_1_results = task_1_queue.get()
task_2_results = task_2_queue.get()
task_1_users = task_1_results['users']
task_2_users = task_2_results['users']
except Exception:
# Here usually I send email notification with error.
print('traceback:', traceback.format_exc())
if __name__ == "__main__":
main()