Как убить процессы зомби, созданные модулем многопроцессорности?
Я очень новичок в модуле multiprocessing
. И я просто попытался создать следующее: у меня есть один процесс, который должен получить сообщение от RabbitMQ и передать его во внутреннюю очередь (multiprocessing.Queue
). Тогда я хочу сделать следующее: создать процесс при появлении нового сообщения. Он работает, но после завершения работы он оставляет процесс зомби, который не прерывается родителем. Вот мой код:
Основной процесс:
#!/usr/bin/env python
import multiprocessing
import logging
import consumer
import producer
import worker
import time
import base
conf = base.get_settings()
logger = base.logger(identity='launcher')
request_order_q = multiprocessing.Queue()
result_order_q = multiprocessing.Queue()
request_status_q = multiprocessing.Queue()
result_status_q = multiprocessing.Queue()
CONSUMER_KEYS = [{'queue':'product.order',
'routing_key':'product.order',
'internal_q':request_order_q}]
# {'queue':'product.status',
# 'routing_key':'product.status',
# 'internal_q':request_status_q}]
def main():
# Launch consumers
for key in CONSUMER_KEYS:
cons = consumer.RabbitConsumer(rabbit_q=key['queue'],
routing_key=key['routing_key'],
internal_q=key['internal_q'])
cons.start()
# Check reques_order_q if not empty spaw a process and process message
while True:
time.sleep(0.5)
if not request_order_q.empty():
handler = worker.Worker(request_order_q.get())
logger.info('Launching Worker')
handler.start()
if __name__ == "__main__":
main()
И вот мой рабочий:
import multiprocessing
import sys
import time
import base
conf = base.get_settings()
logger = base.logger(identity='worker')
class Worker(multiprocessing.Process):
def __init__(self, msg):
super(Worker, self).__init__()
self.msg = msg
self.daemon = True
def run(self):
logger.info('%s' % self.msg)
time.sleep(10)
sys.exit(1)
Итак, после обработки всех сообщений я вижу процессы с помощью команды ps aux
. Но я бы очень хотел, чтобы они были закончены.
Спасибо.
Ответы
Ответ 1
Несколько вещей:
Ответ 2
Использование multiprocessing.active_children
лучше, чем Process.join
. Функция active_children
очищает любые зомби, созданные с момента последнего вызова active_children
. Метод join
ожидает выбранного процесса. В течение этого времени другие процессы могут прекращаться и становиться зомби, но родительский процесс не будет замечать, пока не будет присоединен ожидаемый метод. Чтобы увидеть это в действии:
import multiprocessing as mp
import time
def main():
n = 3
c = list()
for i in xrange(n):
d = dict(i=i)
p = mp.Process(target=count, kwargs=d)
p.start()
c.append(p)
for p in reversed(c):
p.join()
print('joined')
def count(i):
print('{i} going to sleep'.format(i=i))
time.sleep(i * 10)
print('{i} woke up'.format(i=i))
if __name__ == '__main__':
main()
Вышеизложенное создаст 3 процесса, которые заканчиваются на 10 секунд друг от друга. Как и в коде, последний процесс соединяется первым, так что остальные два, которые раньше были закончены, будут зомби в течение 20 секунд. Вы можете увидеть их с помощью:
ps aux | grep Z
Не будет никаких зомби, если процессы будут ожидаться в последовательности, которую они завершат. Удалите reversed
, чтобы увидеть этот случай. Тем не менее, в реальных приложениях мы редко знаем последовательность, с которой дети заканчиваются, поэтому использование join
приведет к некоторым зомби.
Альтернатива active_children
не оставляет зомби.
В приведенном выше примере замените петлю for p in reversed(c):
на:
while True:
time.sleep(1)
if not mp.active_children():
break
и посмотрим, что произойдет.
Ответ 3
Используйте active_children.
multiprocessing.active_children