Multiprocessing.Pool() медленнее, чем просто использование обычных функций
(Этот вопрос касается того, как сделать multiprocessing.Pool() быстрее запускать код. Я, наконец, решил его, и окончательное решение можно найти в нижней части сообщения.)
Оригинальный вопрос:
Я пытаюсь использовать Python для сравнения слова со многими другими словами в списке и получения списка наиболее похожих. Для этого я использую функцию difflib.get_close_matches. Я нахожусь на относительно новом и мощном портативном компьютере Windows 7 с Python 2.6.5.
Я хочу ускорить процесс сравнения, потому что мой список сравнения слов очень длинный, и я должен повторить процесс сравнения несколько раз. Когда я услышал о модуле многопроцессорности, казалось логичным, что если сравнение можно разбить на рабочие задачи и запустить одновременно (и, таким образом, использовать мощность машины в обмен на более высокую скорость), моя задача сравнения завершится быстрее.
Однако даже после того, как вы попробовали много разных способов и использовали методы, которые были показаны в документах и предложены в сообщениях на форуме, метод Pool просто кажется невероятно медленным, гораздо медленнее, чем просто запуск исходной функции get_close_matches на весь список сразу. Я хотел бы помочь понять, почему Pool() работает так медленно, и если я правильно его использую. Я использую только этот сценарий сравнения строк в качестве примера, потому что это самый последний пример, который я мог бы подумать о том, где я не мог понять или получить многопроцессорную работу, а не против меня. Ниже приведен пример кода из сценария difflib, показывающего разницу во времени между обычным и объединенным методами:
from multiprocessing import Pool
import random, time, difflib
# constants
wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(1000000)]
mainword = "hello"
# comparison function
def findclosematch(subwordlist):
matches = difflib.get_close_matches(mainword,subwordlist,len(subwordlist),0.7)
if matches <> []:
return matches
# pool
print "pool method"
if __name__ == '__main__':
pool = Pool(processes=3)
t=time.time()
result = pool.map_async(findclosematch, wordlist, chunksize=100)
#do something with result
for r in result.get():
pass
print time.time()-t
# normal
print "normal method"
t=time.time()
# run function
result = findclosematch(wordlist)
# do something with results
for r in result:
pass
print time.time()-t
Слово, которое нужно найти, это "привет", а список слов, в котором можно найти близкие совпадения, - это 1 миллион длинный список из 5 случайных символов (только для иллюстрации). Я использую 3 процессорных ядра и функцию карты с размером порядка 100 (списки, которые будут обрабатываться на одного рабочего, я думаю?) (Я также пробовал куски 1000 и 10 000, но не было никакой реальной разницы). Обратите внимание, что в обоих методах я запускаю таймер прямо перед вызовом моей функции и заканчиваю его сразу после прокрутки результатов. Как вы можете видеть ниже, результаты синхронизации явно соответствуют оригинальному методу, отличному от пула:
>>>
pool method
37.1690001488 seconds
normal method
10.5329999924 seconds
>>>
Метод пула почти в 4 раза медленнее исходного метода. Есть ли что-то, что мне не хватает здесь, или, может быть, непонимание того, как работает объединение/многопроцессорность? Я подозреваю, что часть проблемы здесь может заключаться в том, что функция map возвращает None и поэтому добавляет тысячи ненужных элементов в список результатов, хотя я хочу, чтобы фактические совпадения возвращались к результатам и записывали их как таковые в функции. Насколько я понимаю, именно так работает карта. Я слышал о некоторых других функциях, таких как фильтр, который собирает не-False результаты, но я не думаю, что многопроцессорность/пул поддерживает метод фильтра. Существуют ли какие-либо другие функции помимо map/imap в модуле многопроцессорности, которые могли бы помочь мне в возвращении только того, что возвращает моя функция? Применить функцию больше для предоставления нескольких аргументов, как я понимаю.
Я знаю там также функцию imap, которую я пробовал, но без каких-либо улучшений времени. Причина, по той же причине, почему у меня возникли проблемы с пониманием того, что так замечательно в модуле itertools, предположительно "молниеносно", что я заметил, верно для вызова функции, но по моему опыту и тому, что я читал, потому что вызов функции фактически не выполняет никаких вычислений, поэтому, когда пришло время перебирать результаты, чтобы собирать и анализировать их (без которых не было бы смысла вызывать cuntion), это занимает столько же или иногда больше времени, чем просто используя обычную версию функции. Но я полагаю, что для другого поста.
В любом случае, взволнованно видеть, может ли кто-то подтолкнуть меня в правильном направлении здесь и действительно оценить любую помощь в этом. Я больше заинтересован в понимании многопроцессорности в целом, чем для того, чтобы этот пример работал, хотя было бы полезно с некоторыми примерами предложений кода решения, чтобы помочь в моем понимании.
Ответ:
Кажется, что замедление связано с медленным временем запуска дополнительных процессов. Я не мог получить функцию .Pool(), чтобы быть достаточно быстрой. Моим окончательным решением сделать это быстрее было вручную разбить список рабочей нагрузки, использовать несколько .Process() вместо .Pool() и вернуть решения в очередь. Но мне кажется, что, возможно, самым важным изменением могло быть разделение рабочей нагрузки с точки зрения основного слова, а не слова, с которым можно сравнить, возможно, потому, что функция поиска difflib уже так быстро. Вот новый код, выполняющий 5 процессов одновременно, и оказался примерно на x10 быстрее, чем простой код (6 секунд против 55 секунд). Очень полезно для быстрых нечетких поисков, в дополнение к тому, как быстро это происходит.
from multiprocessing import Process, Queue
import difflib, random, time
def f2(wordlist, mainwordlist, q):
for mainword in mainwordlist:
matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7)
q.put(matches)
if __name__ == '__main__':
# constants (for 50 input words, find closest match in list of 100 000 comparison words)
q = Queue()
wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(100000)]
mainword = "hello"
mainwordlist = [mainword for each in xrange(50)]
# normal approach
t = time.time()
for mainword in mainwordlist:
matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7)
q.put(matches)
print time.time()-t
# split work into 5 or 10 processes
processes = 5
def splitlist(inlist, chunksize):
return [inlist[x:x+chunksize] for x in xrange(0, len(inlist), chunksize)]
print len(mainwordlist)/processes
mainwordlistsplitted = splitlist(mainwordlist, len(mainwordlist)/processes)
print "list ready"
t = time.time()
for submainwordlist in mainwordlistsplitted:
print "sub"
p = Process(target=f2, args=(wordlist,submainwordlist,q,))
p.Daemon = True
p.start()
for submainwordlist in mainwordlistsplitted:
p.join()
print time.time()-t
while True:
print q.get()
Ответы
Ответ 1
Мое лучшее предположение - это непрофессиональная коммуникация (IPC). В экземпляре одного процесса единственный процесс имеет список слов. При делегировании другим процессам основной процесс должен постоянно передавать разделы списка другим процессам.
Таким образом, следует, что лучшим подходом может быть отключение n процессов, каждый из которых отвечает за загрузку/генерирование 1/n сегмента списка и проверку того, находится ли слово в этой части списка.
Я не уверен, как это сделать с библиотекой многопроцессорности Python.
Ответ 2
Эти проблемы обычно сводятся к следующему:
Функция, которую вы пытаетесь распараллелить, не требует ресурсов ЦП (т.е. Времени ЦП) для рационализации распараллеливания!
Конечно, когда вы распараллеливаетесь с multiprocessing.Pool(8)
, вы теоретически (но не практически) могли бы увеличить скорость в 8 раз.
Однако имейте в виду, что это не бесплатно - вы получаете это распараллеливание за счет следующих издержек:
- Создание
task
для каждого chunk
(размера chunksize
) в вашем iter
переданном в Pool.map(f, iter)
- Для каждого
task
- Сериализация
task
и возвращаемое значение task's
(думаю, pickle.dumps()
) - Десериализовать
task
и task's
возвращаемое значение (думаю, pickle.loads()
) - Waste значительного времени ожидания
Locks
на общих памяти Queues
, в то время как рабочие процессы и родительские процессы get()
и put()
из/в эти Queues
.
-
os.fork()
стоимость вызовов os.fork()
для каждого рабочего процесса, что дорого.
По сути, при использовании Pool()
вы хотите:
- Высокие требования к ресурсам процессора
- Низкий объем данных передается на каждый вызов функции
- Разумно долго
iter
, чтобы оправдать одноразовую стоимость (3) выше.
Для более подробного изучения этого поста и связанных с ним обсуждений рассказывается, как большие Pool.map()
данных, передаваемые в Pool.map()
(и друзьям), Pool.map()
вам неприятности.
Раймонд Хеттингер также говорит о правильном использовании параллелизма Python здесь.
Ответ 3
Я испытал нечто похожее на Пул по другой проблеме. Я не уверен в реальной причине на данный момент...
Ответ, отредактированный OP Karim Bahgat, - это то же самое решение, что и для меня. После перехода в систему Process и Queue я смог увидеть ускорение в ряд с количеством ядер для машины.
Вот пример.
def do_something(data):
return data * 2
def consumer(inQ, outQ):
while True:
try:
# get a new message
val = inQ.get()
# this is the 'TERM' signal
if val is None:
break;
# unpack the message
pos = val[0] # its helpful to pass in/out the pos in the array
data = val[1]
# process the data
ret = do_something(data)
# send the response / results
outQ.put( (pos, ret) )
except Exception, e:
print "error!", e
break
def process_data(data_list, inQ, outQ):
# send pos/data to workers
for i,dat in enumerate(data_list):
inQ.put( (i,dat) )
# process results
for i in range(len(data_list)):
ret = outQ.get()
pos = ret[0]
dat = ret[1]
data_list[pos] = dat
def main():
# initialize things
n_workers = 4
inQ = mp.Queue()
outQ = mp.Queue()
# instantiate workers
workers = [mp.Process(target=consumer, args=(inQ,outQ))
for i in range(n_workers)]
# start the workers
for w in workers:
w.start()
# gather some data
data_list = [ d for d in range(1000)]
# lets process the data a few times
for i in range(4):
process_data(data_list)
# tell all workers, no more data (one msg for each)
for i in range(n_workers):
inQ.put(None)
# join on the workers
for w in workers:
w.join()
# print out final results (i*16)
for i,dat in enumerate(data_list):
print i, dat