Multiprocessing.Pool() медленнее, чем просто использование обычных функций

(Этот вопрос касается того, как сделать multiprocessing.Pool() быстрее запускать код. Я, наконец, решил его, и окончательное решение можно найти в нижней части сообщения.)

Оригинальный вопрос:

Я пытаюсь использовать Python для сравнения слова со многими другими словами в списке и получения списка наиболее похожих. Для этого я использую функцию difflib.get_close_matches. Я нахожусь на относительно новом и мощном портативном компьютере Windows 7 с Python 2.6.5.

Я хочу ускорить процесс сравнения, потому что мой список сравнения слов очень длинный, и я должен повторить процесс сравнения несколько раз. Когда я услышал о модуле многопроцессорности, казалось логичным, что если сравнение можно разбить на рабочие задачи и запустить одновременно (и, таким образом, использовать мощность машины в обмен на более высокую скорость), моя задача сравнения завершится быстрее.

Однако даже после того, как вы попробовали много разных способов и использовали методы, которые были показаны в документах и ​​предложены в сообщениях на форуме, метод Pool просто кажется невероятно медленным, гораздо медленнее, чем просто запуск исходной функции get_close_matches на весь список сразу. Я хотел бы помочь понять, почему Pool() работает так медленно, и если я правильно его использую. Я использую только этот сценарий сравнения строк в качестве примера, потому что это самый последний пример, который я мог бы подумать о том, где я не мог понять или получить многопроцессорную работу, а не против меня. Ниже приведен пример кода из сценария difflib, показывающего разницу во времени между обычным и объединенным методами:

from multiprocessing import Pool
import random, time, difflib

# constants
wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(1000000)]
mainword = "hello"

# comparison function
def findclosematch(subwordlist):
    matches = difflib.get_close_matches(mainword,subwordlist,len(subwordlist),0.7)
    if matches <> []:
        return matches

# pool
print "pool method"
if __name__ == '__main__':
    pool = Pool(processes=3)
    t=time.time()
    result = pool.map_async(findclosematch, wordlist, chunksize=100)
    #do something with result
    for r in result.get():
        pass
    print time.time()-t

# normal
print "normal method"
t=time.time()
# run function
result = findclosematch(wordlist)
# do something with results
for r in result:
    pass
print time.time()-t

Слово, которое нужно найти, это "привет", а список слов, в котором можно найти близкие совпадения, - это 1 миллион длинный список из 5 случайных символов (только для иллюстрации). Я использую 3 процессорных ядра и функцию карты с размером порядка 100 (списки, которые будут обрабатываться на одного рабочего, я думаю?) (Я также пробовал куски 1000 и 10 000, но не было никакой реальной разницы). Обратите внимание, что в обоих методах я запускаю таймер прямо перед вызовом моей функции и заканчиваю его сразу после прокрутки результатов. Как вы можете видеть ниже, результаты синхронизации явно соответствуют оригинальному методу, отличному от пула:

>>> 
pool method
37.1690001488 seconds
normal method
10.5329999924 seconds
>>> 

Метод пула почти в 4 раза медленнее исходного метода. Есть ли что-то, что мне не хватает здесь, или, может быть, непонимание того, как работает объединение/многопроцессорность? Я подозреваю, что часть проблемы здесь может заключаться в том, что функция map возвращает None и поэтому добавляет тысячи ненужных элементов в список результатов, хотя я хочу, чтобы фактические совпадения возвращались к результатам и записывали их как таковые в функции. Насколько я понимаю, именно так работает карта. Я слышал о некоторых других функциях, таких как фильтр, который собирает не-False результаты, но я не думаю, что многопроцессорность/пул поддерживает метод фильтра. Существуют ли какие-либо другие функции помимо map/imap в модуле многопроцессорности, которые могли бы помочь мне в возвращении только того, что возвращает моя функция? Применить функцию больше для предоставления нескольких аргументов, как я понимаю.

Я знаю там также функцию imap, которую я пробовал, но без каких-либо улучшений времени. Причина, по той же причине, почему у меня возникли проблемы с пониманием того, что так замечательно в модуле itertools, предположительно "молниеносно", что я заметил, верно для вызова функции, но по моему опыту и тому, что я читал, потому что вызов функции фактически не выполняет никаких вычислений, поэтому, когда пришло время перебирать результаты, чтобы собирать и анализировать их (без которых не было бы смысла вызывать cuntion), это занимает столько же или иногда больше времени, чем просто используя обычную версию функции. Но я полагаю, что для другого поста.

В любом случае, взволнованно видеть, может ли кто-то подтолкнуть меня в правильном направлении здесь и действительно оценить любую помощь в этом. Я больше заинтересован в понимании многопроцессорности в целом, чем для того, чтобы этот пример работал, хотя было бы полезно с некоторыми примерами предложений кода решения, чтобы помочь в моем понимании.

Ответ:

Кажется, что замедление связано с медленным временем запуска дополнительных процессов. Я не мог получить функцию .Pool(), чтобы быть достаточно быстрой. Моим окончательным решением сделать это быстрее было вручную разбить список рабочей нагрузки, использовать несколько .Process() вместо .Pool() и вернуть решения в очередь. Но мне кажется, что, возможно, самым важным изменением могло быть разделение рабочей нагрузки с точки зрения основного слова, а не слова, с которым можно сравнить, возможно, потому, что функция поиска difflib уже так быстро. Вот новый код, выполняющий 5 процессов одновременно, и оказался примерно на x10 быстрее, чем простой код (6 секунд против 55 секунд). Очень полезно для быстрых нечетких поисков, в дополнение к тому, как быстро это происходит.

from multiprocessing import Process, Queue
import difflib, random, time

def f2(wordlist, mainwordlist, q):
    for mainword in mainwordlist:
        matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7)
        q.put(matches)

if __name__ == '__main__':

    # constants (for 50 input words, find closest match in list of 100 000 comparison words)
    q = Queue()
    wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(100000)]
    mainword = "hello"
    mainwordlist = [mainword for each in xrange(50)]

    # normal approach
    t = time.time()
    for mainword in mainwordlist:
        matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7)
        q.put(matches)
    print time.time()-t

    # split work into 5 or 10 processes
    processes = 5
    def splitlist(inlist, chunksize):
        return [inlist[x:x+chunksize] for x in xrange(0, len(inlist), chunksize)]
    print len(mainwordlist)/processes
    mainwordlistsplitted = splitlist(mainwordlist, len(mainwordlist)/processes)
    print "list ready"

    t = time.time()
    for submainwordlist in mainwordlistsplitted:
        print "sub"
        p = Process(target=f2, args=(wordlist,submainwordlist,q,))
        p.Daemon = True
        p.start()
    for submainwordlist in mainwordlistsplitted:
        p.join()
    print time.time()-t
    while True:
        print q.get()

Ответы

Ответ 1

Мое лучшее предположение - это непрофессиональная коммуникация (IPC). В экземпляре одного процесса единственный процесс имеет список слов. При делегировании другим процессам основной процесс должен постоянно передавать разделы списка другим процессам.

Таким образом, следует, что лучшим подходом может быть отключение n процессов, каждый из которых отвечает за загрузку/генерирование 1/n сегмента списка и проверку того, находится ли слово в этой части списка.

Я не уверен, как это сделать с библиотекой многопроцессорности Python.

Ответ 2

Эти проблемы обычно сводятся к следующему:

Функция, которую вы пытаетесь распараллелить, не требует ресурсов ЦП (т.е. Времени ЦП) для рационализации распараллеливания!

Конечно, когда вы распараллеливаетесь с multiprocessing.Pool(8), вы теоретически (но не практически) могли бы увеличить скорость в 8 раз.

Однако имейте в виду, что это не бесплатно - вы получаете это распараллеливание за счет следующих издержек:

  1. Создание task для каждого chunk (размера chunksize) в вашем iter переданном в Pool.map(f, iter)
  2. Для каждого task
    1. Сериализация task и возвращаемое значение task's (думаю, pickle.dumps())
    2. Десериализовать task и task's возвращаемое значение (думаю, pickle.loads())
    3. Waste значительного времени ожидания Locks на общих памяти Queues, в то время как рабочие процессы и родительские процессы get() и put() из/в эти Queues.
  3. os.fork() стоимость вызовов os.fork() для каждого рабочего процесса, что дорого.

По сути, при использовании Pool() вы хотите:

  1. Высокие требования к ресурсам процессора
  2. Низкий объем данных передается на каждый вызов функции
  3. Разумно долго iter, чтобы оправдать одноразовую стоимость (3) выше.

Для более подробного изучения этого поста и связанных с ним обсуждений рассказывается, как большие Pool.map() данных, передаваемые в Pool.map() (и друзьям), Pool.map() вам неприятности.

Раймонд Хеттингер также говорит о правильном использовании параллелизма Python здесь.

Ответ 3

Я испытал нечто похожее на Пул по другой проблеме. Я не уверен в реальной причине на данный момент...

Ответ, отредактированный OP Karim Bahgat, - это то же самое решение, что и для меня. После перехода в систему Process и Queue я смог увидеть ускорение в ряд с количеством ядер для машины.

Вот пример.

def do_something(data):
    return data * 2

def consumer(inQ, outQ):
    while True:
        try:
            # get a new message
            val = inQ.get()

            # this is the 'TERM' signal
            if val is None:
                break;

            # unpack the message
            pos = val[0]  # its helpful to pass in/out the pos in the array
            data = val[1]

            # process the data
            ret = do_something(data)

            # send the response / results
            outQ.put( (pos, ret) )


        except Exception, e:
            print "error!", e
            break

def process_data(data_list, inQ, outQ):
    # send pos/data to workers
    for i,dat in enumerate(data_list):
        inQ.put( (i,dat) )

    # process results
    for i in range(len(data_list)):
        ret = outQ.get()
        pos = ret[0]
        dat = ret[1]
        data_list[pos] = dat


def main():
    # initialize things
    n_workers = 4
    inQ = mp.Queue()
    outQ = mp.Queue()
    # instantiate workers
    workers = [mp.Process(target=consumer, args=(inQ,outQ))
               for i in range(n_workers)]

    # start the workers
    for w in workers:
        w.start()

    # gather some data
    data_list = [ d for d in range(1000)]

    # lets process the data a few times
    for i in range(4):
        process_data(data_list)

    # tell all workers, no more data (one msg for each)
    for i in range(n_workers):
        inQ.put(None)
    # join on the workers
    for w in workers:
        w.join()

    # print out final results  (i*16)
    for i,dat in enumerate(data_list):
        print i, dat