Пул python с рабочими процессами
Я пытаюсь использовать рабочий пул в python, используя объекты Process. Каждый рабочий (процесс) выполняет некоторую инициализацию (принимает нетривиальное количество времени), получает серию заданий (в идеале используя map()
) и возвращает что-то. За этим не требуется общение. Однако я не могу понять, как использовать функцию map() для использования моей рабочей функции compute()
.
from multiprocessing import Pool, Process
class Worker(Process):
def __init__(self):
print 'Worker started'
# do some initialization here
super(Worker, self).__init__()
def compute(self, data):
print 'Computing things!'
return data * data
if __name__ == '__main__':
# This works fine
worker = Worker()
print worker.compute(3)
# workers get initialized fine
pool = Pool(processes = 4,
initializer = Worker)
data = range(10)
# How to use my worker pool?
result = pool.map(compute, data)
Является ли очередь заданий вместо этого или я могу использовать map()
?
Ответы
Ответ 1
Я бы посоветовал вам использовать Очередь для этого.
class Worker(Process):
def __init__(self, queue):
super(Worker, self).__init__()
self.queue = queue
def run(self):
print('Worker started')
# do some initialization here
print('Computing things!')
for data in iter(self.queue.get, None):
# Use data
Теперь вы можете начинать их, получая работу из одной очереди
request_queue = Queue()
for i in range(4):
Worker(request_queue).start()
for data in the_real_source:
request_queue.put(data)
# Sentinel objects to allow clean shutdown: 1 per worker.
for i in range(4):
request_queue.put(None)
Подобные вещи должны позволить вам амортизировать дорогую стоимость запуска для нескольких работников.
Ответ 2
initializer
ожидает произвольного вызываемого, который выполняет инициализацию, например, он может устанавливать некоторые глобальные переменные, а не подкласс Process
; map
принимает произвольный итерируемый:
#!/usr/bin/env python
import multiprocessing as mp
def init(val):
print('do some initialization here')
def compute(data):
print('Computing things!')
return data * data
def produce_data():
yield -100
for i in range(10):
yield i
yield 100
if __name__=="__main__":
p = mp.Pool(initializer=init, initargs=('arg',))
print(p.map(compute, produce_data()))
Ответ 3
Начиная с python 3.3 вы можете использовать starmap, а также для использования нескольких аргументов и получения результатов в очень упрощенном синтаксисе:
import multiprocessing
nb_cores = multiprocessing.cpu_count()
def caps(nb, letter):
print('Exec nb:', nb)
return letter.upper()
if __name__ == '__main__':
multiprocessing.freeze_support() # for Windows, also requires to be in the statement: if __name__ == '__main__'
input_data = ['a','b','c','d','e','f','g','h']
input_order = [1,2,3,4,5,6,7,8,9]
with multiprocessing.Pool(processes=nb_cores) as pool: # auto closing workers
results = pool.starmap(caps, zip(input_order, input_data))
print(results)