Могу ли я использовать многопроцессорную очередь в функции, называемой пулом Pool.imap?
Я использую python 2.7 и пытаюсь запустить некоторые сложные задачи процессора в своих собственных процессах. Я хотел бы иметь возможность отправлять сообщения обратно в родительский процесс, чтобы информировать его о текущем состоянии процесса. Очередь многопроцессорности кажется идеальной для этого, но я не могу понять, как заставить ее работать.
Итак, это мой основной рабочий пример минус использование очереди.
import multiprocessing as mp
import time
def f(x):
return x*x
def main():
pool = mp.Pool()
results = pool.imap_unordered(f, range(1, 6))
time.sleep(1)
print str(results.next())
pool.close()
pool.join()
if __name__ == '__main__':
main()
Я попытался передать очередь несколькими способами, и они получили сообщение об ошибке "RuntimeError: объекты Queue должны делиться только между процессами через наследование". Вот один из способов, которым я попытался, основываясь на более раннем ответе, который я нашел. (Я получаю ту же проблему, пытаясь использовать Pool.map_async и Pool.imap)
import multiprocessing as mp
import time
def f(args):
x = args[0]
q = args[1]
q.put(str(x))
time.sleep(0.1)
return x*x
def main():
q = mp.Queue()
pool = mp.Pool()
results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))
print str(q.get())
pool.close()
pool.join()
if __name__ == '__main__':
main()
Наконец, подход 0 для фитнеса (сделать его глобальным) не генерирует никаких сообщений, он просто блокируется.
import multiprocessing as mp
import time
q = mp.Queue()
def f(x):
q.put(str(x))
return x*x
def main():
pool = mp.Pool()
results = pool.imap_unordered(f, range(1, 6))
time.sleep(1)
print q.get()
pool.close()
pool.join()
if __name__ == '__main__':
main()
Я знаю, что он, вероятно, будет работать с multiprocessing.Process напрямую и что есть другие библиотеки, чтобы выполнить это, но я ненавижу отказываться от стандартных функций библиотеки, которые отлично подходят, пока я не уверен, что это не просто мое отсутствие знаний не позволяет мне эксплуатировать их.
Спасибо.
Ответы
Ответ 1
Трюк состоит в том, чтобы передать очередь в качестве аргумента инициализатору. Появляется для работы со всеми способами отправки пула.
import multiprocessing as mp
def f(x):
f.q.put('Doing: ' + str(x))
return x*x
def f_init(q):
f.q = q
def main():
jobs = range(1,6)
q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, jobs)
p.close()
for i in range(len(jobs)):
print q.get()
print results.next()
if __name__ == '__main__':
main()
Ответ 2
Здравствуйте, я пытаюсь следующий код, чтобы получить процесс инвертирования: очистка очереди внутри процессов
import multiprocessing as mp
from random import *
def f(x):
myId=random()
output=[]
while not f.q.empty():
z=f.q.get()
print (str(myId)+"#"+str(z))
output.append(z)
print(str(myId)+" ok")
return output
def f_init(q):
f.q = q
def main():
jobs = []
q = mp.Queue()
for x in range(10):
q.put([x,2*x,3*x])
jobs.append(x)
p = mp.Pool(None, f_init, [q])
results = p.imap(f, jobs)
p.close()
for bResult in results:
print(bResult)
if __name__ == '__main__':
main()
У меня нет ошибок, но пул, кажется, никогда не заканчивается. Не весь процесс закончен. Я могу это исправить?
Ответ 3
странно: если я удаленно печатаю статистику в функции f (x), она работает...
def f(x):
myId=random()
output=[]
while not f.q.empty():
z=f.q.get()
output.append(z)
return output