Python присоединяется к процессу, не блокируя родительский
Я пишу программу, которая будет смотреть конкретный каталог для новых файлов, содержащих URL загрузки. Как только новый файл будет обнаружен, он создаст новый процесс для фактической загрузки, пока родитель продолжит наблюдать за каталогом. Я использую интерфейс Process
от multiprocessing
. Проблема заключается в том, что, если я не вызову process.join(), дочерний процесс все еще запущен, но process.join() является блокирующей функцией, которая побеждает цель создания дочернего элемента для обработки фактической загрузки.
Мой вопрос в том, есть ли способ присоединиться к дочернему процессу неблокирующим образом, который позволит родителям продолжать делать свою работу?
Частичный код:
def main(argv):
# parse command line args
...
# set up variables
...
watch_dir(watch_dir, download_dir)
def watch_dir(wDir, dDir):
# Grab the current watch directory listing
before = dict([(f, None) for f in os.listdir (wDir)])
# Loop FOREVER
while 1:
# sleep for 10 secs
time.sleep(10)
# Grab the current dir listing
after = dict([(f, None) for f in os.listdir (wDir)])
# Get the list of new files
added = [f for f in after if not f in before]
# Get the list of deleted files
removed = [f for f in before if not f in after]
if added:
# We have new files, do your stuff
print "Added: ", ", ".join(added)
# Call the new process for downloading
p = Process(target=child, args=(added, wDir, dDir))
p.start()
p.join()
if removed:
# tell the user the file was deleted
print "Removed: ", ", ".join(removed)
# Set before to the current
before = after
def child(filename, wDir, dDir):
# Open filename and extract the url
...
# Download the file and to the dDir directory
...
# Delete filename from the watch directory
...
# exit cleanly
os._exit(0)
Родитель ждет, пока ребенок завершит выполнение, прежде чем продолжить после p.join()
, который (насколько я могу судить) исправить. Но это побеждает всю цель создания ребенка. Если я останусь без p.join()
, тогда ребенок останется активным, а python ps ax | grep
даст мне "python <defunct> ".
Я хочу, чтобы ребенок закончил то, что делал, и ушел, не подняв родителя. Есть ли способ сделать это?
Ответы
Ответ 1
Вы можете настроить отдельный поток, который соединяет. Попросите его прослушать queue, в который вы нажимаете дескрипторы подпроцесса:
class Joiner(Thread):
def __init__(self, q):
self.__q = q
def run(self):
while True:
child = self.__q.get()
if child == None:
return
child.join()
Затем вместо p.join()
сделайте joinq.put(p)
и сделайте a joinq.put(None)
, чтобы сигнализировать о прекращении потока. Убедитесь, что вы используете очередь FIFO.
Ответ 2
В цикле while вызовите
multiprocessing.active_children()
Возвращает список всех живых детей текущего процесса.
Вызов этого имеет побочный эффект от "присоединения" к уже завершенным процессам.
Ответ 3
Если вам не важно, когда и будет ли ребенок завершен, и вы просто хотите избежать того, чтобы ребенок закончил процесс зомби, тогда вы можете сделать двойную вилку, чтобы внук оказался ребенком init
. В коде:
def child(*args):
p = Process(target=grandchild, args=args)
p.start()
os._exit(0)
def grandchild(filename, wDir, dDir):
# Open filename and extract the url
...
# Download the file and to the dDir directory
...
# Delete filename from the watch directory
...
# exit cleanly
os._exit(0)
Ответ 4
Вместо того, чтобы пытаться использовать shoehorn multiprocessing.Process()
для работы, возможно, вам следует использовать другой инструмент, например apply_async()
с многопроцессорной обработкой .Pool():
def main(argv):
# parse command line args
...
# set up variables
...
# set up multiprocessing Pool
pool = multiprocessing.Pool()
try:
watch_dir(watch_dir, download_dir, pool)
# catch whatever kind of exception you expect to end your infinite loop
# you can omit this try/except if you really think your script will
# run "forever" and you're okay with zombies should it crash
except KeyboardInterrupt:
pool.close()
pool.join()
def watch_dir(wDir, dDir, pool):
# Grab the current watch directory listing
before = dict([(f, None) for f in os.listdir (wDir)])
# Loop FOREVER
while 1:
# sleep for 10 secs
time.sleep(10)
# Grab the current dir listing
after = dict([(f, None) for f in os.listdir (wDir)])
# Get the list of new files
added = [f for f in after if not f in before]
# Get the list of deleted files
removed = [f for f in before if not f in after]
if added:
# We have new files, do your stuff
print "Added: ", ", ".join(added)
# launch the function in a subprocess - this is NON-BLOCKING
pool.apply_async(child, (added, wDir, dDir))
if removed:
# tell the user the file was deleted
print "Removed: ", ", ".join(removed)
# Set before to the current
before = after
def child(filename, wDir, dDir):
# Open filename and extract the url
...
# Download the file and to the dDir directory
...
# Delete filename from the watch directory
...
# simply return to "exit cleanly"
return
multiprocessing.Pool()
- это пул рабочих подпроцессов, к которым вы можете отправить "задания". Вызов функции pool.apply_async()
приводит к тому, что один из подпроцессов запускает вашу функцию с предоставленными аргументами асинхронно и не нуждается в объединении до тех пор, пока ваш script не будет выполнен со всей его работой и не закрывает весь пул. Библиотека управляет деталями для вас.
Я думаю, что это послужит вам лучше, чем текущий принятый ответ по следующим причинам:
1. Он устраняет ненужную сложность запуска дополнительных потоков и очередей только для управления подпроцессами.
2. Он использует библиотечные процедуры, которые сделаны специально для этой цели, поэтому вы получаете преимущества будущих улучшений в библиотеке.
3. ИМХО, он гораздо удобнее обслуживать.
4. Он более гибкий. Если однажды вы решите, что хотите увидеть возвращаемое значение из ваших подпроцессов, вы можете сохранить возвращаемое значение из вызова apply_async()
(результат объекта) и проверяйте его, когда захотите. Вы можете хранить их в списке и обрабатывать их как пакет, когда ваш список превышает определенный размер. Вы можете переместить создание пула в функцию watch_dir()
и покончить с try/except, если вам все равно, что произойдет, если цикл "бесконечный" будет прерван. Если вы помещаете какое-то условие прерывания в (в настоящее время) бесконечный цикл, вы можете просто добавить pool.close()
и pool.join()
после цикла, и все будет очищено.