Как создавать параллельные дочерние процессы в многопроцессорной системе?
У меня есть Python script, который я хочу использовать в качестве контроллера для другого Python script. У меня есть сервер с 64 процессорами, поэтому вы хотите создать до 64 дочерних процессов этого второго Python script. Ребенок script вызывается:
$ python create_graphs.py --name=NAME
где NAME - это что-то вроде XYZ, ABC, NYU и т.д.
В моем родительском контроллере script я извлекаю переменную имени из списка:
my_list = [ 'XYZ', 'ABC', 'NYU' ]
Итак, мой вопрос: какой лучший способ вывести эти процессы в качестве детей? Я хочу ограничить число детей до 64 за раз, поэтому нужно отслеживать статус (если дочерний процесс завершен или нет), поэтому я могу эффективно поддерживать работу всего поколения.
Я изучил использование пакета подпроцессов, но отклонил его, потому что он порождает только одного ребенка за раз. Наконец, я нашел многопроцессорный пакет, но я признаю, что он перегружен всей документацией по потокам и подпроцессам.
В настоящее время мой script использует subprocess.call
только для порождения одного ребенка за раз и выглядит так:
#!/path/to/python
import subprocess, multiprocessing, Queue
from multiprocessing import Process
my_list = [ 'XYZ', 'ABC', 'NYU' ]
if __name__ == '__main__':
processors = multiprocessing.cpu_count()
for i in range(len(my_list)):
if( i < processors ):
cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
child = subprocess.call( cmd, shell=False )
Я действительно хочу, чтобы он породил 64 ребенка за раз. В других вопросах stackoverflow я видел людей, использующих Queue, но похоже, что это создает хит производительности?
Ответы
Ответ 1
Что вы ищете, это класс пул процессов в многопроцессорной обработке.
import multiprocessing
import subprocess
def work(cmd):
return subprocess.call(cmd, shell=False)
if __name__ == '__main__':
count = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=count)
print pool.map(work, ['ls'] * count)
И вот пример расчета, чтобы было легче понять. Следующее разделит 10000 задач на N процессов, где N - счет процессора. Обратите внимание, что я пропускаю None как количество процессов. Это приведет к тому, что класс Pool будет использовать cpu_count для количества процессов (ссылка)
import multiprocessing
import subprocess
def calculate(value):
return value * 10
if __name__ == '__main__':
pool = multiprocessing.Pool(None)
tasks = range(10000)
results = []
r = pool.map_async(calculate, tasks, callback=results.append)
r.wait() # Wait on the results
print results
Ответ 2
Вот решение, которое я придумал, основываясь на комментариях Надии и Джима. Я не уверен, что это лучший способ, но он работает. Исходный дочерний script должен быть оболочкой script, потому что мне нужно использовать некоторые сторонние приложения, включая Matlab. Поэтому я должен был вынуть его из Python и закодировать его в bash.
import sys
import os
import multiprocessing
import subprocess
def work(staname):
print 'Processing station:',staname
print 'Parent process:', os.getppid()
print 'Process id:', os.getpid()
cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ]
return subprocess.call(cmd, shell=False)
if __name__ == '__main__':
my_list = [ 'XYZ', 'ABC', 'NYU' ]
my_list.sort()
print my_list
# Get the number of processors available
num_processes = multiprocessing.cpu_count()
threads = []
len_stas = len(my_list)
print "+++ Number of stations to process: %s" % (len_stas)
# run until all the threads are done, and there is no data left
for list_item in my_list:
# if we aren't using all the processors AND there is still data left to
# compute, then spawn another thread
if( len(threads) < num_processes ):
p = multiprocessing.Process(target=work,args=[list_item])
p.start()
print p, p.is_alive()
threads.append(p)
else:
for thread in threads:
if not thread.is_alive():
threads.remove(thread)
Это похоже на разумное решение? Я пытался использовать Jim в формате loop, но мой script просто ничего не возвращал. Я не знаю, почему это было бы. Вот результат, когда я запускаю script с Jim 'while', заменяя цикл 'for':
hostname{me}2% controller.py
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
hostname{me}3%
Когда я запускаю его с циклом "for", я получаю что-то более значимое:
hostname{me}6% controller.py
['ABC', 'NYU', 'XYZ']
Number of processes: 64
+++ Number of stations to process: 3
Processing station: ABC
Parent process: 1056
Process id: 1068
Processing station: NYU
Parent process: 1056
Process id: 1069
Processing station: XYZ
Parent process: 1056
Process id: 1071
hostname{me}7%
Итак, это работает, и я счастлив. Тем не менее, я до сих пор не понимаю, почему я не могу использовать цикл Jim 'while' вместо цикла 'for', который я использую. Спасибо за всю помощь - меня впечатляет широта знаний @stackoverflow.
Ответ 3
Я бы определенно использовал multiprocessing вместо того, чтобы катить мое собственное решение с помощью подпроцесса.
Ответ 4
Я не думаю, что вам нужна очередь, если вы не собираетесь получать данные из приложений (что, если вам нужны данные, я думаю, что в любом случае может быть проще добавить их в базу данных)
но попробуйте это для размера:
поместите содержимое вашей create_graphs.py script все в функцию, называемую create_graphs
import threading
from create_graphs import create_graphs
num_processes = 64
my_list = [ 'XYZ', 'ABC', 'NYU' ]
threads = []
# run until all the threads are done, and there is no data left
while threads or my_list:
# if we aren't using all the processors AND there is still data left to
# compute, then spawn another thread
if (len(threads) < num_processes) and my_list:
t = threading.Thread(target=create_graphs, args=[ my_list.pop() ])
t.setDaemon(True)
t.start()
threads.append(t)
# in the case that we have the maximum number of threads check if any of them
# are done. (also do this when we run out of data, until all the threads are done)
else:
for thread in threads:
if not thread.isAlive():
threads.remove(thread)
Я знаю, что это приведет к тому, что на 1 меньше потоков, чем у процессоров, что, вероятно, хорошо, оно оставляет процессор для управления потоками, дисковыми вводами и другими вещами, происходящими на компьютере. Если вы решите, что хотите использовать последнее ядро, просто добавьте его к нему
edit. Я думаю, что, возможно, неверно истолковал цель my_list. Вам не нужно my_list
отслеживать потоки вообще (так как все они указаны в списке threads
). Но это прекрасный способ подавать входные процессы - или даже лучше: использовать функцию генератора;)
Цель my_list
и threads
my_list
содержит данные, которые необходимо обрабатывать в вашей функции
threads
- это всего лишь список текущих выполняемых потоков
цикл while выполняет две функции: запускает новые потоки для обработки данных и проверяет, выполняются ли какие-либо потоки.
Итак, если у вас есть (a) больше данных для обработки или (b) потоки, которые не закончены, вы хотите продолжить выполнение программы. Как только оба списка пусты, они будут оцениваться как False
, а цикл while выйдет из