Показывать ход вызова карты пула многопроцессорности пула Python?
У меня есть script, который успешно выполняет многопроцессорный набор пулов задач с вызовом imap_unordered()
:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
Однако мой num_tasks
составляет около 250 000, и поэтому join()
блокирует основной поток в течение 10 секунд или около того, и я хотел бы иметь возможность эхо-вывода в командной строке постепенно, чтобы показать основной процесс не заблокирован. Что-то вроде:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print "Waiting for", remaining, "tasks to complete..."
time.sleep(2)
Есть ли метод для объекта результата или самого пула, который указывает количество оставшихся задач? Я попытался использовать объект multiprocessing.Value
в качестве счетчика (do_work
вызывает действие counter.value += 1
после выполнения своей задачи), но счетчик получает только ~ 85% от общего значения до остановки приращения.
Ответы
Ответ 1
Нет необходимости обращаться к приватным атрибутам набора результатов:
from __future__ import division
import sys
for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))
Ответ 2
Мой личный фаворит - дает вам хороший небольшой индикатор выполнения и завершение ETA, когда все работает и фиксируется параллельно.
from multiprocessing import Pool
import tqdm
pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
pass
Ответ 3
Нашел ответ самостоятельно с некоторым дополнительным копанием: взглянув на __dict__
объекта результата imap_unordered
, я обнаружил, что он имеет атрибут _index
, который увеличивается с каждым завершением задачи. Таким образом, это работает для ведения журнала, завернутого в цикл while
:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
completed = rs._index
if (completed == num_tasks): break
print "Waiting for", num_tasks-completed, "tasks to complete..."
time.sleep(2)
Однако я обнаружил, что замена imap_unordered
на map_async
привела к тому, что более быстрое выполнение, хотя объект результата немного отличается. Вместо этого объект результата из map_async
имеет атрибут _number_left
и метод ready()
:
p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
if (rs.ready()): break
remaining = rs._number_left
print "Waiting for", remaining, "tasks to complete..."
time.sleep(0.5)
Ответ 4
Я обнаружил, что работа уже была сделана к тому времени, когда я попытался проверить ее прогресс. Это то, что работает для меня, используя tqdm.
pip install tqdm
from multiprocessing import Pool
from tqdm import tqdm
tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))
def do_work(x):
# do something with x
pbar.update(1)
pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()
Это должно работать со всеми разновидностями многопроцессорной обработки, независимо от того, блокируют они или нет.
Ответ 5
Я знаю, что это довольно старый вопрос, но вот что я делаю, когда хочу отслеживать прогресс пула задач в python.
from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep
def my_function(letter):
sleep(2)
return letter+letter
dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)
results = []
pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()
r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]
while len(results) != len(dummy_args):
pbar.update(len(results))
sleep(0.5)
pbar.finish()
print results
В принципе, вы используете apply_async с callbak (в этом случае он должен добавить возвращаемое значение в список), поэтому вам не нужно ждать, чтобы сделать что-то еще. Затем, в течение цикла while, вы проверяете ход работы. В этом случае я добавил виджет, чтобы сделать его более приятным.
Выход:
4 of 4
['AA', 'BB', 'CC', 'DD']
Надеюсь, что это поможет.
Ответ 6
Я создал специальный класс для создания распечатки прогресса. Маби это помогает:
from multiprocessing import Pool, cpu_count
class ParallelSim(object):
def __init__(self, processes=cpu_count()):
self.pool = Pool(processes=processes)
self.total_processes = 0
self.completed_processes = 0
self.results = []
def add(self, func, args):
self.pool.apply_async(func=func, args=args, callback=self.complete)
self.total_processes += 1
def complete(self, result):
self.results.extend(result)
self.completed_processes += 1
print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))
def run(self):
self.pool.close()
self.pool.join()
def get_results(self):
return self.results
Ответ 7
Попробуйте этот простой подход на основе очереди, который также можно использовать с пулами. Помните, что распечатка чего-либо после инициализации индикатора выполнения приведет к его перемещению, по крайней мере, для данного индикатора выполнения. (PyPI прогресс 1.5)
import time
from progress.bar import Bar
def status_bar( queue_stat, n_groups, n ):
bar = Bar('progress', max = n)
finished = 0
while finished < n_groups:
while queue_stat.empty():
time.sleep(0.01)
gotten = queue_stat.get()
if gotten == 'finished':
finished += 1
else:
bar.next()
bar.finish()
def process_data( queue_data, queue_stat, group):
for i in group:
... do stuff resulting in new_data
queue_stat.put(1)
queue_stat.put('finished')
queue_data.put(new_data)
def multiprocess():
new_data = []
groups = [[1,2,3],[4,5,6],[7,8,9]]
combined = sum(groups,[])
queue_data = multiprocessing.Queue()
queue_stat = multiprocessing.Queue()
for i, group in enumerate(groups):
if i == 0:
p = multiprocessing.Process(target = status_bar,
args=(queue_stat,len(groups),len(combined)))
processes.append(p)
p.start()
p = multiprocessing.Process(target = process_data,
args=(queue_data, queue_stat, group))
processes.append(p)
p.start()
for i in range(len(groups)):
data = queue_data.get()
new_data += data
for p in processes:
p.join()
Ответ 8
По предложению Тима вы можете использовать tqdm
и imap
для решения этой проблемы. Я только что наткнулся на эту проблему и imap_unordered
решение imap_unordered
, чтобы получить доступ к результатам сопоставления. Вот как это работает:
from multiprocessing import Pool
import tqdm
pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))
Если вам не нужны значения, возвращаемые вашими заданиями, вам не нужно присваивать список какой-либо переменной.