Python 3 Многопроцессорный тупик очереди при вызове соединения до того, как очередь пуста
У меня вопрос о понимании очереди в модуле multiprocessing
в python 3
Это то, что они говорят в правилах программирования:
Имейте в виду, что процесс, который поместил элементы в очередь, будет ждать раньше
до тех пор, пока все буферизованные элементы не будут подаваться нитью фидера
базовая труба. (Детский процесс может вызвать
Queue.cancel_join_thread
метод очереди, чтобы избежать такого поведения.)
Это означает, что всякий раз, когда вы используете очередь, вам нужно убедиться, что все
элементы, которые были помещены в очередь, в конечном итоге будут удалены до
процесс соединен. В противном случае вы не можете быть уверены, что процессы, которые
положить предметы в очередь завершатся. Помните также, что не-демонические
процессы будут автоматически соединены.
Примером может быть тупик:
from multiprocessing import Process, Queue
def f(q):
q.put('X' * 1000000)
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join() # this deadlocks
obj = queue.get()
Исправить здесь будет замена последних двух строк (или просто удаление
p.join()).
Поэтому, по-видимому, queue.get()
не следует вызывать после join()
.
Однако есть примеры использования очередей, где get
вызывается после join
, как:
import multiprocessing as mp
import random
import string
# define a example function
def rand_string(length, output):
""" Generates a random string of numbers, lower- and uppercase chars. """
rand_str = ''.join(random.choice(
string.ascii_lowercase
+ string.ascii_uppercase
+ string.digits)
for i in range(length))
output.put(rand_str)
if __name__ == "__main__":
# Define an output queue
output = mp.Queue()
# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, output))
for x in range(2)]
# Run processes
for p in processes:
p.start()
# Exit the completed processes
for p in processes:
p.join()
# Get process results from the output queue
results = [output.get() for p in processes]
print(results)
Я запустил эту программу, и она работает (также опубликовано как решение вопроса StackOverFlow Python 3 - Multiprocessing - Queue.get() не отвечает).
Может кто-нибудь помочь мне понять, что такое правило для тупика здесь?
Ответы
Ответ 1
Реализация очереди в многопроцессорной обработке, которая позволяет передавать данные между процессами, зависит от стандартных протоколов ОС.
Трубы OS не бесконечно длинны, поэтому процесс, который может блокировать данные в ОС во время операции put()
, пока какой-либо другой процесс не использует get()
для извлечения данных из очереди.
Для небольших объемов данных, таких как один в вашем примере, основной процесс может join()
все порожденные подпроцессы, а затем получать данные. Это часто работает хорошо, но не масштабируется, и неясно, когда он сломается.
Но это, безусловно, сломается с большими объемами данных. Подпроцесс будет заблокирован в put()
, ожидая, что основной процесс удалит некоторые данные из очереди с помощью get()
, но основной процесс заблокирован в join()
, ожидая завершения подпроцесса. Это приводит к тупиковой ситуации.
Вот пример, когда у пользователя была эта точная проблема. Я отправил некоторый код в ответ, который помог ему решить его проблему.
Ответ 2
Не вызывайте join()
объекта процесса, прежде чем вы получите все сообщения из общей очереди.
Я использовал следующее обходное решение, чтобы позволить процессам выйти до обработки всех его результатов:
results = []
while True:
try:
result = resultQueue.get(False, 0.01)
results.append(result)
except queue.Empty:
pass
allExited = True
for t in processes:
if t.exitcode is None:
allExited = False
break
if allExited & resultQueue.empty():
break
Он может быть сокращен, но я оставил его дольше, чтобы быть более понятным для новичков.
Здесь resultQueue
- это multiprocess.Queue
, который был разделен объектами multiprocess.Process
. После этого блока кода вы получите массив result
со всеми сообщениями из очереди.
Проблема заключается в том, что входной буфер очереди, который принимает сообщения, может стать полным, вызвав бесконечный блок (-ы) записи, пока не будет достаточно места для приема следующего сообщения. Таким образом, у вас есть три способа избежать блокировки:
- Увеличьте размер
multiprocessing.connection.BUFFER
(не очень хорошо)
- Уменьшить размер сообщения или его количество (не очень хорошо)
- Извлеките сообщения из очереди немедленно, когда они придут (хороший способ)
Ответ 3
Я пытался модуль Multiprocessing преобразовать список текстовых файлов в BERT Embedding.
Для каждого файла создается вложение BERT, но для определенного файла процесс не заканчивается.
Ранее я использовал операцию join() для завершения процессов, но раньше она заходила в тупик.
Итак, как предлагается здесь
Process.join() и Queue() не работают в случае больших чисел
Я сделал изменения в коде, чтобы заменить process.join()
from multiprocessing import Process
import multiprocessing
import time
import sys
def process(file,appended_data):
start = datetime.now()
file1_obj = open(form_path + file, 'r')
file1 = file1_obj.readlines()
file1_obj.close()
file11=[i.rstrip() for i in file1 if not(bool(not i or i.isspace()))]
file111=[' |||'.join(file11)]
try:
bc=BertClient()
embedding1=bc.encode(file111)
del bc
except ValueError: #some files have '' as their first strins in the list
embedding1=None
appended_data.put({file:embedding1})
print("finished %s"%file)
print(datetime.now()-start)
return appended_data
def embedding_dic(file_list):
procs = []
appended_data = multiprocessing.Queue()
print(file_list[0])
print(file_list)
for file in file_list:
procs.append(Process(target=process, args=(file,appended_data,)))
for proc in procs:
proc.start()
results = []
liveprocs = list(procs)
while liveprocs:
try:
while 1:
r=appended_data.get(False)
results.append(r)
except Exception:
pass
time.sleep(0.05) # Give tasks a chance to put more data in
if not appended_data.empty():
continue
liveprocs = [p for p in liveprocs if p.is_alive()]
print(liveprocs)
print(len(results))
return results
все же тупик случается в случае определенных файлов.
Описание ниже:
Выполнение функции embedding_dic для списка файлов приводит к
No of files available : 7
Files _names:
['0001368007_10-K_2007-03-22.txt', '0001368007_10-K_2008-03-25.txt', '0001368007_10-K_2009-02-27.txt', '0001368007_10-K_2010-03-01.txt', '0001368007_10-K_2011-02-28.txt', '0001368007_10-K_2012-02-29.txt', '0001368007_10-K_2012-02-29.txt']
Processes_started:
[<Process(Process-1899, started)>, <Process(Process-1900, started)>, <Process(Process-1901, started)>, <Process(Process-1902, started)>, <Process(Process-1903, started)>, <Process(Process-1904, started)>, <Process(Process-1905, started)>]
0
[<Process(Process-1899, started)>, <Process(Process-1900, started)>, <Process(Process-1901, started)>, <Process(Process-1902, started)>, <Process(Process-1903, started)>, <Process(Process-1904, started)>, <Process(Process-1905, started)>]
0
[<Process(Process-1899, started)>, <Process(Process-1900, started)>, <Process(Process-1901, started)>, <Process(Process-1902, started)>, <Process(Process-1903, started)>, <Process(Process-1904, started)>, <Process(Process-1905, started)>]
0
[<Process(Process-1899, started)>, <Process(Process-1900, started)>, <Process(Process-1901, started)>, <Process(Process-1902, started)>, <Process(Process-1903, started)>, <Process(Process-1904, started)>, <Process(Process-1905, started)>]
0
[<Process(Process-1899, started)>, <Process(Process-1900, started)>, <Process(Process-1901, started)>, <Process(Process-1902, started)>, <Process(Process-1903, started)>, <Process(Process-1904, started)>, <Process(Process-1905, started)>]
0
finished 0001368007_10-K_2009-02-27.txt
0:00:03.055049
finished 0001368007_10-K_2012-02-29.txt
0:00:03.023879
finished 0001368007_10-K_2012-02-29.txt
0:00:03.055496
finished 0001368007_10-K_2010-03-01.txt
0:00:03.096127
finished 0001368007_10-K_2011-02-28.txt
0:00:03.099099
[<Process(Process-1899, started)>, <Process(Process-1900, started)>]
5
finished 0001368007_10-K_2008-03-25.txt
0:00:04.473414
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
[<Process(Process-1899, started)>]
6
Process Process-1899:
File "/home/jovyan/.conda/envs/pycp_py3k/lib/python3.6/site-packages/bert_serving/client/__init__.py", line 206, in arg_wrapper
return func(self, *args, **kwargs)
[<Process(Process-1899, started)>]
6
Traceback (most recent call last):
File "/home/jovyan/.conda/envs/pycp_py3k/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/home/jovyan/.conda/envs/pycp_py3k/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "<ipython-input-315-ffe782d1c2f5>", line 12, in process
embedding1=bc.encode(file111)
File "/home/jovyan/.conda/envs/pycp_py3k/lib/python3.6/site-packages/bert_serving/client/__init__.py", line 291, in encode
r = self._recv_ndarray(req_id)
Таким образом, этот процесс находится в тупике с файлом 0001368007_10-K_2007-03-22.txt, когда в качестве входных данных указан список файлов.
В случае, если я пытаюсь только с тем же файлом, что и вход. Это заканчивается !!!
Он заканчивается даже в том случае, если количество файлов сохраняется до 5.
Даже для какого-то другого списка файлов, которые имеют файлы больше 7, например 10 или 12. Процесс завершается.
Я не могу отладить то же самое.
Еще один симптом, который я заметил, что
- если я через некоторое время перезапущу код, он завершится!
Помощь оценена!