Python присоединяется к процессу, не блокируя родительский

Я пишу программу, которая будет смотреть конкретный каталог для новых файлов, содержащих URL загрузки. Как только новый файл будет обнаружен, он создаст новый процесс для фактической загрузки, пока родитель продолжит наблюдать за каталогом. Я использую интерфейс Process от multiprocessing. Проблема заключается в том, что, если я не вызову process.join(), дочерний процесс все еще запущен, но process.join() является блокирующей функцией, которая побеждает цель создания дочернего элемента для обработки фактической загрузки.

Мой вопрос в том, есть ли способ присоединиться к дочернему процессу неблокирующим образом, который позволит родителям продолжать делать свою работу?

Частичный код:

def main(argv):
  # parse command line args
  ...
  # set up variables
  ...
  watch_dir(watch_dir, download_dir)


def watch_dir(wDir, dDir):
  # Grab the current watch directory listing
  before = dict([(f, None) for f in os.listdir (wDir)])

  # Loop FOREVER
  while 1:
    # sleep for 10 secs
    time.sleep(10)

    # Grab the current dir listing
    after = dict([(f, None) for f in os.listdir (wDir)])

    # Get the list of new files
    added = [f for f in after if not f in before]
    # Get the list of deleted files
    removed = [f for f in before if not f in after]

    if added:
      # We have new files, do your stuff
      print "Added: ", ", ".join(added)

      # Call the new process for downloading
      p = Process(target=child, args=(added, wDir, dDir))
      p.start()
      p.join()

    if removed:
      # tell the user the file was deleted
      print "Removed: ", ", ".join(removed)

    # Set before to the current
    before = after

def child(filename, wDir, dDir):
  # Open filename and extract the url
  ...
  # Download the file and to the dDir directory
  ...
  # Delete filename from the watch directory
  ...
  # exit cleanly
  os._exit(0)

Родитель ждет, пока ребенок завершит выполнение, прежде чем продолжить после p.join(), который (насколько я могу судить) исправить. Но это побеждает всю цель создания ребенка. Если я останусь без p.join(), тогда ребенок останется активным, а python ps ax | grep даст мне "python <defunct> ".

Я хочу, чтобы ребенок закончил то, что делал, и ушел, не подняв родителя. Есть ли способ сделать это?

Ответы

Ответ 1

Вы можете настроить отдельный поток, который соединяет. Попросите его прослушать queue, в который вы нажимаете дескрипторы подпроцесса:

class Joiner(Thread):
    def __init__(self, q):
        self.__q = q
    def run(self):
        while True:
            child = self.__q.get()
            if child == None:
                return
            child.join()

Затем вместо p.join() сделайте joinq.put(p) и сделайте a joinq.put(None), чтобы сигнализировать о прекращении потока. Убедитесь, что вы используете очередь FIFO.

Ответ 2

В цикле while вызовите

multiprocessing.active_children()

Возвращает список всех живых детей текущего процесса. Вызов этого имеет побочный эффект от "присоединения" к уже завершенным процессам.

Ответ 3

Если вам не важно, когда и будет ли ребенок завершен, и вы просто хотите избежать того, чтобы ребенок закончил процесс зомби, тогда вы можете сделать двойную вилку, чтобы внук оказался ребенком init. В коде:

def child(*args):
  p = Process(target=grandchild, args=args)
  p.start()
  os._exit(0)

def grandchild(filename, wDir, dDir):
  # Open filename and extract the url
  ...
  # Download the file and to the dDir directory
  ...
  # Delete filename from the watch directory
  ...
  # exit cleanly
  os._exit(0)

Ответ 4

Вместо того, чтобы пытаться использовать shoehorn multiprocessing.Process() для работы, возможно, вам следует использовать другой инструмент, например apply_async() с многопроцессорной обработкой .Pool():

def main(argv):
    # parse command line args
    ...
    # set up variables
    ...

    # set up multiprocessing Pool
    pool = multiprocessing.Pool()

    try:
        watch_dir(watch_dir, download_dir, pool)

    # catch whatever kind of exception you expect to end your infinite loop
    # you can omit this try/except if you really think your script will 
    # run "forever" and you're okay with zombies should it crash
    except KeyboardInterrupt:
        pool.close()
        pool.join()

def watch_dir(wDir, dDir, pool):
    # Grab the current watch directory listing
    before = dict([(f, None) for f in os.listdir (wDir)])

    # Loop FOREVER
    while 1:
        # sleep for 10 secs
        time.sleep(10)

        # Grab the current dir listing
        after = dict([(f, None) for f in os.listdir (wDir)])

        # Get the list of new files
        added = [f for f in after if not f in before]
        # Get the list of deleted files
        removed = [f for f in before if not f in after]

        if added:
            # We have new files, do your stuff
            print "Added: ", ", ".join(added)

            # launch the function in a subprocess - this is NON-BLOCKING
            pool.apply_async(child, (added, wDir, dDir))

        if removed:
            # tell the user the file was deleted
            print "Removed: ", ", ".join(removed)

        # Set before to the current
        before = after

def child(filename, wDir, dDir):
    # Open filename and extract the url
    ...
    # Download the file and to the dDir directory
    ...
    # Delete filename from the watch directory
    ...
    # simply return to "exit cleanly"
    return

multiprocessing.Pool() - это пул рабочих подпроцессов, к которым вы можете отправить "задания". Вызов функции pool.apply_async() приводит к тому, что один из подпроцессов запускает вашу функцию с предоставленными аргументами асинхронно и не нуждается в объединении до тех пор, пока ваш script не будет выполнен со всей его работой и не закрывает весь пул. Библиотека управляет деталями для вас.

Я думаю, что это послужит вам лучше, чем текущий принятый ответ по следующим причинам:
1. Он устраняет ненужную сложность запуска дополнительных потоков и очередей только для управления подпроцессами.
2. Он использует библиотечные процедуры, которые сделаны специально для этой цели, поэтому вы получаете преимущества будущих улучшений в библиотеке.
3. ИМХО, он гораздо удобнее обслуживать.
4. Он более гибкий. Если однажды вы решите, что хотите увидеть возвращаемое значение из ваших подпроцессов, вы можете сохранить возвращаемое значение из вызова apply_async() (результат объекта) и проверяйте его, когда захотите. Вы можете хранить их в списке и обрабатывать их как пакет, когда ваш список превышает определенный размер. Вы можете переместить создание пула в функцию watch_dir() и покончить с try/except, если вам все равно, что произойдет, если цикл "бесконечный" будет прерван. Если вы помещаете какое-то условие прерывания в (в настоящее время) бесконечный цикл, вы можете просто добавить pool.close() и pool.join() после цикла, и все будет очищено.