Максимальный размер для многопроцессорной обработки.
Я работаю над довольно большим проектом на Python, который требует, чтобы одна из вычислительных интенсивных фоновых задач была выгружена в другое ядро, так что основная служба не замедляется. Я столкнулся с каким-то странным поведением при использовании multiprocessing.Queue
для передачи результатов рабочего процесса. Используя одну и ту же очередь для threading.Thread
и multiprocessing.Process
для целей сравнения, поток работает очень хорошо, но процесс не может присоединиться после помещения большого элемента в очередь. Обратите внимание:
import threading
import multiprocessing
class WorkerThread(threading.Thread):
def __init__(self, queue, size):
threading.Thread.__init__(self)
self.queue = queue
self.size = size
def run(self):
self.queue.put(range(size))
class WorkerProcess(multiprocessing.Process):
def __init__(self, queue, size):
multiprocessing.Process.__init__(self)
self.queue = queue
self.size = size
def run(self):
self.queue.put(range(size))
if __name__ == "__main__":
size = 100000
queue = multiprocessing.Queue()
worker_t = WorkerThread(queue, size)
worker_p = WorkerProcess(queue, size)
worker_t.start()
worker_t.join()
print 'thread results length:', len(queue.get())
worker_p.start()
worker_p.join()
print 'process results length:', len(queue.get())
Я видел, что это отлично работает для size = 10000
, но висит на worker_p.join()
для size = 100000
. Есть ли какой-то собственный предел размера для того, что могут быть multiprocessing.Process
экземпляры в multiprocessing.Queue
? Или я делаю какую-то очевидную, фундаментальную ошибку здесь?
Для справки, я использую Python 2.6.5 на Ubuntu 10.04.
Ответы
Ответ 1
Кажется, что базовый канал заполнен, поэтому поток фидера блокируется при записи в канал (фактически, когда вы пытаетесь получить блокировку, защищающую канал от одновременного доступа).
Проверьте эту проблему http://bugs.python.org/issue8237
Ответ 2
По умолчанию maxsize of Queue бесконечен, но вы его преодолели. В вашем случае worker_p помещает элемент в очередь, очередь должна быть освобождена до вызова соединения. Подробнее см. Ссылку ниже.
https://docs.python.org/2/library/multiprocessing.html#programming-guidelines
Ответ 3
Ответ на многопроцессорность python: некоторые функции не возвращаются, когда они завершены (материал слишком большой) реализует то, что вы, вероятно, подразумеваете под "dequeuing" перед тем, как присоединиться к" при параллельном выполнении произвольного набора функций, возвращаемые значения которых попадают в очередь.
Таким образом, любой размер материала может быть помещен в очередь, так что предел, который вы нашли, не мешает.