Пул многопроцессорности python завершается
Я работаю над renderfarm, и мне нужны мои клиенты, чтобы иметь возможность запускать несколько экземпляров средства визуализации без блокировки, чтобы клиент мог получать новые команды. У меня это работает правильно, однако у меня возникают проблемы с завершением созданных процессов.
На глобальном уровне я определяю свой пул (чтобы я мог получить к нему доступ из любой функции):
p = Pool(2)
Затем я вызываю свой рендерер с помощью apply_async:
for i in range(totalInstances):
p.apply_async(render, (allRenderArgs[i],args[2]), callback=renderFinished)
p.close()
Эта функция завершается, запускает процессы в фоновом режиме и ждет новых команд. Я сделал простую команду, которая убьет клиента и остановит рендеринг:
def close():
'close this client instance'
tn.write ("say "+USER+" is leaving the farm\r\n")
try:
p.terminate()
except Exception,e:
print str(e)
sys.exit()
sys.exit()
Кажется, что он не дает ошибку (она выведет ошибку), python завершается, но фоновые процессы все еще работают. Может ли кто-нибудь рекомендовать лучший способ управления этими запущенными программами?
Ответы
Ответ 1
Нашел ответ на мой собственный вопрос. Основная проблема заключалась в том, что я вызывал стороннее приложение, а не функцию. Когда я вызываю подпроцесс [используя метод вызова() или Popen()], он создает новый экземпляр python, единственной целью которого является вызов нового приложения. Однако, когда выйдет python, он убьет этот новый экземпляр python и оставит приложение запущенным.
Решение состоит в том, чтобы сделать это трудным путем, найдя pid процесса python, который создан, получая детей из этого pid и убивая их. Этот код специфичен для osx; есть простой код (который не полагается на grep), доступный для Linux.
for process in pool:
processId = process.pid
print "attempting to terminate "+str(processId)
command = " ps -o pid,ppid -ax | grep "+str(processId)+" | cut -f 1 -d \" \" | tail -1"
ps_command = Popen(command, shell=True, stdout=PIPE)
ps_output = ps_command.stdout.read()
retcode = ps_command.wait()
assert retcode == 0, "ps command returned %d" % retcode
print "child process pid: "+ str(ps_output)
os.kill(int(ps_output), signal.SIGTERM)
os.kill(int(processId), signal.SIGTERM)
Ответ 2
Я нашел решение: остановите пул в отдельном потоке, например:
def close_pool():
global pool
pool.close()
pool.terminate()
pool.join()
def term(*args,**kwargs):
sys.stderr.write('\nStopping...')
# httpd.shutdown()
stophttp = threading.Thread(target=httpd.shutdown)
stophttp.start()
stoppool=threading.Thread(target=close_pool)
stoppool.daemon=True
stoppool.start()
signal.signal(signal.SIGTERM, term)
signal.signal(signal.SIGINT, term)
signal.signal(signal.SIGQUIT, term)
Работает нормально и всегда проверяется.
Ответ 3
Если вы все еще испытываете эту проблему, вы можете попробовать имитировать Pool
с демонстрационными процессами (при условии, что вы запускаете пул/процессов из недемонического процесса). Я сомневаюсь, что это лучшее решение, так как кажется, что ваши процессы Pool
должны быть завершены, но это все, что я мог придумать. Я не знаю, что делает ваш обратный вызов, поэтому я не уверен, где разместить его в моем примере ниже.
Я также предлагаю попробовать создать Pool
в __main__
из-за моего опыта (и документов) с необычностью, возникающей, когда процессы генерируются глобально. Это особенно актуально, если вы находитесь в Windows: http://docs.python.org/2/library/multiprocessing.html#windows
from multiprocessing import Process, JoinableQueue
# the function for each process in our pool
def pool_func(q):
while True:
allRenderArg, otherArg = q.get() # blocks until the queue has an item
try:
render(allRenderArg, otherArg)
finally: q.task_done()
# best practice to go through main for multiprocessing
if __name__=='__main__':
# create the pool
pool_size = 2
pool = []
q = JoinableQueue()
for x in range(pool_size):
pool.append(Process(target=pool_func, args=(q,)))
# start the pool, making it "daemonic" (the pool should exit when this proc exits)
for p in pool:
p.daemon = True
p.start()
# submit jobs to the queue
for i in range(totalInstances):
q.put((allRenderArgs[i], args[2]))
# wait for all tasks to complete, then exit
q.join()
Ответ 4
# -*- coding:utf-8 -*-
import multiprocessing
import time
import sys
import threading
from functools import partial
#> work func
def f(a,b,c,d,e):
print('start')
time.sleep(4)
print(a,b,c,d,e)
###########> subProcess func
#1. start a thead for work func
#2. waiting thead with a timeout
#3. exit the subProcess
###########
def mulPro(f, *args, **kwargs):
timeout = kwargs.get('timeout',None)
#1.
t = threading.Thread(target=f, args=args)
t.setDaemon(True)
t.start()
#2.
t.join(timeout)
#3.
sys.exit()
if __name__ == "__main__":
p = multiprocessing.Pool(5)
for i in range(5):
#1. process the work func with "subProcess func"
new_f = partial(mulPro, f, timeout=8)
#2. fire on
p.apply_async(new_f, args=(1,2,3,4,5),)
# p.apply_async(f, args=(1,2,3,4,5), timeout=2)
for i in range(10):
time.sleep(1)
print(i+1,"s")
p.close()
# p.join()