Как создавать параллельные дочерние процессы в многопроцессорной системе?

У меня есть Python script, который я хочу использовать в качестве контроллера для другого Python script. У меня есть сервер с 64 процессорами, поэтому вы хотите создать до 64 дочерних процессов этого второго Python script. Ребенок script вызывается:

$ python create_graphs.py --name=NAME

где NAME - это что-то вроде XYZ, ABC, NYU и т.д.

В моем родительском контроллере script я извлекаю переменную имени из списка:

my_list = [ 'XYZ', 'ABC', 'NYU' ]

Итак, мой вопрос: какой лучший способ вывести эти процессы в качестве детей? Я хочу ограничить число детей до 64 за раз, поэтому нужно отслеживать статус (если дочерний процесс завершен или нет), поэтому я могу эффективно поддерживать работу всего поколения.

Я изучил использование пакета подпроцессов, но отклонил его, потому что он порождает только одного ребенка за раз. Наконец, я нашел многопроцессорный пакет, но я признаю, что он перегружен всей документацией по потокам и подпроцессам.

В настоящее время мой script использует subprocess.call только для порождения одного ребенка за раз и выглядит так:

#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process

my_list = [ 'XYZ', 'ABC', 'NYU' ]

if __name__ == '__main__':
    processors = multiprocessing.cpu_count()

    for i in range(len(my_list)):
        if( i < processors ):
             cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
             child = subprocess.call( cmd, shell=False )

Я действительно хочу, чтобы он породил 64 ребенка за раз. В других вопросах stackoverflow я видел людей, использующих Queue, но похоже, что это создает хит производительности?

Ответы

Ответ 1

Что вы ищете, это класс пул процессов в многопроцессорной обработке.

import multiprocessing
import subprocess

def work(cmd):
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':
    count = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=count)
    print pool.map(work, ['ls'] * count)

И вот пример расчета, чтобы было легче понять. Следующее разделит 10000 задач на N процессов, где N - счет процессора. Обратите внимание, что я пропускаю None как количество процессов. Это приведет к тому, что класс Pool будет использовать cpu_count для количества процессов (ссылка)

import multiprocessing
import subprocess

def calculate(value):
    return value * 10

if __name__ == '__main__':
    pool = multiprocessing.Pool(None)
    tasks = range(10000)
    results = []
    r = pool.map_async(calculate, tasks, callback=results.append)
    r.wait() # Wait on the results
    print results

Ответ 2

Вот решение, которое я придумал, основываясь на комментариях Надии и Джима. Я не уверен, что это лучший способ, но он работает. Исходный дочерний script должен быть оболочкой script, потому что мне нужно использовать некоторые сторонние приложения, включая Matlab. Поэтому я должен был вынуть его из Python и закодировать его в bash.

import sys
import os
import multiprocessing
import subprocess

def work(staname):
    print 'Processing station:',staname
    print 'Parent process:', os.getppid()
    print 'Process id:', os.getpid()
    cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':

    my_list = [ 'XYZ', 'ABC', 'NYU' ]

    my_list.sort()

    print my_list

    # Get the number of processors available
    num_processes = multiprocessing.cpu_count()

    threads = []

    len_stas = len(my_list)

    print "+++ Number of stations to process: %s" % (len_stas)

    # run until all the threads are done, and there is no data left

    for list_item in my_list:

        # if we aren't using all the processors AND there is still data left to
        # compute, then spawn another thread

        if( len(threads) < num_processes ):

            p = multiprocessing.Process(target=work,args=[list_item])

            p.start()

            print p, p.is_alive()

            threads.append(p)

        else:

            for thread in threads:

                if not thread.is_alive():

                    threads.remove(thread)

Это похоже на разумное решение? Я пытался использовать Jim в формате loop, но мой script просто ничего не возвращал. Я не знаю, почему это было бы. Вот результат, когда я запускаю script с Jim 'while', заменяя цикл 'for':

hostname{me}2% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%

Когда я запускаю его с циклом "for", я получаю что-то более значимое:

hostname{me}6% controller.py 
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%

Итак, это работает, и я счастлив. Тем не менее, я до сих пор не понимаю, почему я не могу использовать цикл Jim 'while' вместо цикла 'for', который я использую. Спасибо за всю помощь - меня впечатляет широта знаний @stackoverflow.

Ответ 3

Я бы определенно использовал multiprocessing вместо того, чтобы катить мое собственное решение с помощью подпроцесса.

Ответ 4

Я не думаю, что вам нужна очередь, если вы не собираетесь получать данные из приложений (что, если вам нужны данные, я думаю, что в любом случае может быть проще добавить их в базу данных)

но попробуйте это для размера:

поместите содержимое вашей create_graphs.py script все в функцию, называемую create_graphs

import threading
from create_graphs import create_graphs

num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]

threads = []

# run until all the threads are done, and there is no data left
while threads or my_list:

    # if we aren't using all the processors AND there is still data left to
    # compute, then spawn another thread
    if (len(threads) < num_processes) and my_list:
        t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
        t.setDaemon(True)
        t.start()
        threads.append(t)

    # in the case that we have the maximum number of threads check if any of them
    # are done. (also do this when we run out of data, until all the threads are done)
    else:
        for thread in threads:
            if not thread.isAlive():
                threads.remove(thread)

Я знаю, что это приведет к тому, что на 1 меньше потоков, чем у процессоров, что, вероятно, хорошо, оно оставляет процессор для управления потоками, дисковыми вводами и другими вещами, происходящими на компьютере. Если вы решите, что хотите использовать последнее ядро, просто добавьте его к нему

edit. Я думаю, что, возможно, неверно истолковал цель my_list. Вам не нужно my_list отслеживать потоки вообще (так как все они указаны в списке threads). Но это прекрасный способ подавать входные процессы - или даже лучше: использовать функцию генератора;)

Цель my_list и threads

my_list содержит данные, которые необходимо обрабатывать в вашей функции
threads - это всего лишь список текущих выполняемых потоков

цикл while выполняет две функции: запускает новые потоки для обработки данных и проверяет, выполняются ли какие-либо потоки.

Итак, если у вас есть (a) больше данных для обработки или (b) потоки, которые не закончены, вы хотите продолжить выполнение программы. Как только оба списка пусты, они будут оцениваться как False, а цикл while выйдет из