Многопроцессорство в Python при ограничении количества запущенных процессов
Я хотел бы одновременно запускать несколько экземпляров program.py, одновременно ограничивая количество экземпляров, запущенных одновременно (например, количество ядер ЦП, доступных в моей системе). Например, если у меня есть 10 ядер и вам нужно выполнить 1000 запусков program.py, всего будет создано и запущено всего 10 экземпляров в любой момент времени.
Я пробовал использовать модуль многопроцессорности, многопоточность и использование очередей, но мне не показалось, что мне легко поддаться легкой реализации. Самая большая проблема, с которой я сталкиваюсь, - найти способ ограничить количество запущенных процессов одновременно. Это важно, потому что, если я создам 1000 процессов одновременно, это становится эквивалентным вилкой. Мне не нужны результаты, возвращенные из процессов программным способом (они выводятся на диск), и все процессы выполняются независимо друг от друга.
Может кто-нибудь, пожалуйста, дайте мне предложения или пример того, как я мог реализовать это в python или даже bash? Я бы опубликовал код, который я написал до сих пор, используя очереди, но он не работает так, как предполагалось, и может быть уже неверным путем.
Большое спасибо.
Ответы
Ответ 1
Я знаю, что вы упомянули, что подход Pool.map не имеет для вас никакого смысла. Карта - это просто простой способ дать ему источник работы и возможность обращения к каждому из элементов. func
для карты может быть любой точкой входа, чтобы выполнить фактическую работу над данным аргументом.
Если вам это не кажется правильным, у меня есть довольно подробный ответ об использовании шаблона Producer-Consumer: fooobar.com/questions/106290/...
По существу, вы создаете очередь и начинаете N число рабочих. Затем вы либо загружаете очередь из основного потока, либо создаете процесс Producer, который передает очередь. Работники просто продолжают работать из очереди, и никогда не будет более параллельной работы, чем количество запущенных вами процессов.
У вас также есть возможность установить лимит на очередь, чтобы он блокировал производителя, когда есть слишком много выдающейся работы, если вам нужно также установить ограничения на скорость и ресурсы, которые потребляет производитель.
Функция работы, вызываемая вызовом, может делать все, что вы хотите. Это может быть оболочка вокруг некоторой системной команды или она может импортировать вашу библиотеку python и запускать основную процедуру. Существуют специальные системы управления процессами, которые позволяют настраивать конфигурации для запуска произвольных исполняемых файлов в ограниченных ресурсах, но это всего лишь базовый подход к выполнению этого подхода на основе python.
Отрывки из этого моего другого ответа:
Основной пул:
from multiprocessing import Pool
def do_work(val):
# could instantiate some other library class,
# call out to the file system,
# or do something simple right here.
return "FOO: %s" % val
pool = Pool(4)
work = get_work_args()
results = pool.map(do_work, work)
Использование диспетчера процессов и производителя
from multiprocessing import Process, Manager
import time
import itertools
def do_work(in_queue, out_list):
while True:
item = in_queue.get()
# exit signal
if item == None:
return
# fake work
time.sleep(.5)
result = item
out_list.append(result)
if __name__ == "__main__":
num_workers = 4
manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)
# start for workers
pool = []
for i in xrange(num_workers):
p = Process(target=do_work, args=(work, results))
p.start()
pool.append(p)
# produce data
# this could also be started in a producer process
# instead of blocking
iters = itertools.chain(get_work_args(), (None,)*num_workers)
for item in iters:
work.put(item)
for p in pool:
p.join()
print results
Ответ 2
Вы должны использовать диспетчер процессов. Один из подходов будет использовать API, предоставляемый Circus, чтобы сделать это "программно", сайт документации теперь отключен, но я думаю, что его просто во всяком случае, вы можете использовать Цирк, чтобы справиться с этим. Другим подходом будет использование supervisord и установка параметра numprocs
процесса на количество ядер, которые у вас есть.
Пример использования Цирка:
from circus import get_arbiter
arbiter = get_arbiter("myprogram", numprocesses=3)
try:
arbiter.start()
finally:
arbiter.stop()
Ответ 3
Bash script, а не Python, но я часто использую его для простой параллельной обработки:
#!/usr/bin/env bash
waitForNProcs()
{
nprocs=$(pgrep -f $procName | wc -l)
while [ $nprocs -gt $MAXPROCS ]; do
sleep $SLEEPTIME
nprocs=$(pgrep -f $procName | wc -l)
done
}
SLEEPTIME=3
MAXPROCS=10
procName=myPython.py
for file in ./data/*.txt; do
waitForNProcs
./$procName $file &
done
Или для очень простых случаев другой параметр - xargs, где P задает количество procs
find ./data/ | grep txt | xargs -P10 -I SUB ./myPython.py SUB
Ответ 4
В то время как есть много ответов об использовании multiprocessing.pool, не так много фрагментов кода о том, как использовать многопроцессорность. Процесс, который действительно более полезен при использовании памяти. запуск 1000 процессов приведет к перегрузке процессора и уничтожению памяти. Если каждый процесс и его конвейеры данных интенсивно занимаются памятью, OS или Python будут ограничивать количество параллельных процессов. Я разработал приведенный ниже код, чтобы ограничить одновременное количество заданий, переданных в CPU партиями. Размер партии можно масштабировать пропорционально количеству ядер ЦП. На моем ПК с Windows количество заданий на партию может быть эффективным в 4 раза выше, чем у процессора.
import multiprocessing
def func_to_be_multiprocessed(q):
q.put('s')
q = multiprocessing.Queue()
worker = []
for p in range(number_of_jobs):
worker[p].append(multiprocessing.Process(target=func_to_be_multiprocessed, \
args=(q,data)...))
num_cores = multiprocessing.cpu_count()
Scaling_factor_batch_jobs = 3.0
num_jobs_per_batch = num_cores * Scaling_factor_batch_jobs
num_of_batches = number_of_jobs // num_jobs_per_batch
for i_batch in range(num_of_batches):
floor_job = i_batch * num_jobs_per_batch
ceil_job = floor_job + num_jobs_per_batch
for p in worker[floor_job : ceil_job]:
worker.start()
for p in worker[floor_job : ceil_job]:
worker.join()
for p in worker[ceil_job :]:
worker.start()
for p in worker[ceil_job :]:
worker.join()
for p in multiprocessing.active_children():
p.terminate()
result = []
for p in worker:
result.append(q.get())
Единственная проблема заключается в том, что если какая-либо работа в какой-либо партии не может завершиться и приведет к зависанию, остальные задания будут работать. Таким образом, функция, подлежащая обработке, должна иметь правильные процедуры обработки ошибок.