Использование мультипроцессинга. Процесс с максимальным количеством одновременных процессов
У меня есть код Python
:
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
for i in range(0, MAX_PROCESSES):
p = Process(target=f, args=(i,))
p.start()
который работает хорошо. Однако MAX_PROCESSES
является переменной и может принимать любое значение от 1
до 512
. Поскольку я запускаю этот код только на машине с 8
ядрами, мне нужно выяснить, возможно ли ограничить число процессов, разрешенных для одновременной работы. Я изучил multiprocessing.Queue
, но он не похож на то, что мне нужно, или, возможно, я неправильно интерпретирую документы.
Есть ли способ ограничить количество одновременных multiprocessing.Process
процессов. multiprocessing.Process
запущен?
Ответы
Ответ 1
Было бы разумнее использовать multiprocessing.Pool
, который создает пул рабочих процессов на основе максимального количества ядер, доступных в вашей системе, а затем в основном загружает задачи, когда ядра становятся доступными.
Пример из стандартных документов (http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers) показывает, что вы также можете вручную установить количество ядер:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
И также полезно знать, что существует метод multiprocessing.cpu_count()
для подсчета количества ядер в данной системе, если это необходимо в вашем коде.
Изменить: вот какой код кода, который, кажется, работает для вашего конкретного случая:
import multiprocessing
def f(name):
print 'hello', name
if __name__ == '__main__':
pool = multiprocessing.Pool() #use all available cores, otherwise specify the number you want as an argument
for i in xrange(0, 512):
pool.apply_async(f, args=(i,))
pool.close()
pool.join()
Ответ 2
в целом, это также может выглядеть так:
import multiprocessing
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
numberOfThreads = 4
if __name__ == '__main__':
jobs = []
for i, param in enumerate(params):
p = multiprocessing.Process(target=f, args=(i,param))
jobs.append(p)
for i in chunks(jobs,numberOfThreads):
for j in i:
j.start()
for j in i:
j.join()
Конечно, этот способ довольно жесток (так как он ожидает каждого процесса в нежелательной, пока не продолжится со следующей частью). Тем не менее, он работает хорошо примерно для одинакового времени выполнения вызовов функций.
Ответ 3
Я думаю, что семафор - это то, что вы ищете, пример кода:
from multiprocessing import Semaphore
def f(name, sema):
print 'hello', name
sema.release()
if __name__ == '__main__':
concurrency = 20
total_task_num = 1000
sema = Semaphore(concurrency)
all_processes = []
for i in range(total_task_num):
sema.acquire()
p = Process(target=f, args=(i, sema))
all_processes.append(p)
p.start()
# inside main process, wait for all processes to finish
for p in all_processes:
p.join()
Другой способ, который может сделать код более структурированным, но потреблять слишком много ресурсов, если total_task_num
очень большой, заключается в следующем:
from multiprocessing import Semaphore
def f(name, sema):
sema.acquire()
print 'hello', name
sema.release()
if __name__ == '__main__':
concurrency = 20
total_task_num = 1000
sema = Semaphore(concurrency)
all_processes = []
for i in range(total_task_num):
p = Process(target=f, args=(i, sema))
all_processes.append(p)
p.start()
# inside main process, wait for all processes to finish
for p in all_processes:
p.join()