Как использовать все ядра с многопроцессорной обработкой python

теперь работают с функцией Python multicore в течение более часа, пытаясь распараллелить довольно сложную функцию обхода графика с помощью Process и Manager:

import networkx as nx
import csv
import time 
from operator import itemgetter
import os
import multiprocessing as mp

cutoff = 1

exclusionlist = ["cpd:C00024"]

DG = nx.read_gml("KeggComplete.gml", relabel = True)

for exclusion in exclusionlist:
    DG.remove_node(exclusion)

#checks if 'memorizedPaths exists, and if not, creates it
fn = os.path.join(os.path.dirname(__file__), 'memorizedPaths' + str(cutoff+1))
if not os.path.exists(fn):
    os.makedirs(fn)

manager = mp.Manager()
memorizedPaths = manager.dict()
filepaths = manager.dict()
degreelist = sorted(DG.degree_iter(),key=itemgetter(1),reverse=True)

def _all_simple_paths_graph(item, DG, cutoff, memorizedPaths, filepaths):
    source = item[0]
    uniqueTreePaths = []
    if cutoff < 1:
        return
    visited = [source]
    stack = [iter(DG[source])]
    while stack:
        children = stack[-1]
        child = next(children, None)
        if child is None:
            stack.pop()
            visited.pop()
        elif child in memorizedPaths:
            for path in memorizedPaths[child]:
                newPath = (tuple(visited) + tuple(path))
                if (len(newPath) <= cutoff) and (len(set(visited) & set(path)) == 0):
                    uniqueTreePaths.append(newPath)
            continue
        elif len(visited) < cutoff:
            if child not in visited:
                visited.append(child)
                stack.append(iter(DG[child]))
                if visited not in uniqueTreePaths:
                    uniqueTreePaths.append(tuple(visited))
        else: #len(visited) == cutoff:
            if (visited not in uniqueTreePaths) and (child not in visited):
                uniqueTreePaths.append(tuple(visited + [child]))
            stack.pop()
            visited.pop()
    #writes the absolute path of the node path file into the hash table
    filepaths[source] = str(fn) + "/" + str(source) +"path.txt"
    with open (filepaths[source], "wb") as csvfile2:
        writer = csv.writer(csvfile2, delimiter=' ', quotechar='|')
        for path in uniqueTreePaths:
            writer.writerow(path)
    memorizedPaths[source] = uniqueTreePaths

############################################################################

start = time.clock()
if __name__ == '__main__':
    for item in degreelist:
        test = mp.Process(target=_all_simple_paths_graph, args=(DG, cutoff, item, memorizedPaths, filepaths))
        test.start()
        test.join()
end = time.clock()
print (end-start)

В настоящее время - хотя удача и магия - это работает (вроде). Моя проблема в том, что я использую только 12 из моих 24 ядер.

Может кто-нибудь объяснить, почему это может быть так? Возможно, мой код - не лучшее многопроцессорное решение, или это особенность моей архитектуры [Intel Xeon CPU E5-2640 @2.50GHz x18 работает на Ubuntu 13.04 x64]?

EDIT:

Мне удалось получить:

p = mp.Pool()
for item in degreelist:
    p.apply_async(_all_simple_paths_graph, args=(DG, cutoff, item, memorizedPaths, filepaths))
p.close()
p.join()

Работая, однако, ОЧЕНЬ МЕДЛЕННО! Поэтому я предполагаю, что я использую неправильную функцию для работы. надеюсь, это поможет прояснить то, что я пытаюсь выполнить!

EDIT2: .map попытка:

partialfunc = partial(_all_simple_paths_graph, DG=DG, cutoff=cutoff, memorizedPaths=memorizedPaths, filepaths=filepaths)
p = mp.Pool()
for item in processList:
    processVar = p.map(partialfunc, xrange(len(processList)))   
p.close()
p.join()

Работает медленнее, чем singlecore. Время для оптимизации!

Ответы

Ответ 1

Слишком много накапливается здесь, чтобы обращаться к комментариям, поэтому, где mp есть multiprocessing:

mp.cpu_count() должно возвращать количество процессоров. Но проверьте это. Некоторые платформы напуганы, и эту информацию получить не всегда легко. Python делает все возможное.

Если вы начнете 24 процесса, они сделают именно то, что вы им скажите;-) Похоже, mp.Pool() будет наиболее удобным для вас. Вы передаете количество процессов, которые хотите создать, в его конструктор. mp.Pool(processes=None) будет использовать mp.cpu_count() для количества процессоров.

Затем вы можете использовать, например, .imap_unordered(...) экземпляр Pool, чтобы распространить ваш degreelist на все процессы. Или, может быть, какой-то другой метод Pool будет работать лучше для вас - эксперимент.

Если вы не можете bash проблему в представлении Pool мира, вы можете вместо этого создать mp.Queue, чтобы создать рабочую очередь, .put() 'узлы (или узлы узлов), чтобы уменьшить накладные расходы) для работы в основной программе и записи рабочих в .get() рабочих элементов с этой очереди. Спросите, нужны ли вам примеры. Обратите внимание, что вам нужно поместить контрольные значения (по одному на процесс) в очередь, после всех "реальных" рабочих элементов, чтобы рабочие процессы могли проверять, чтобы сторож знал, когда они будут сделаны.

FYI, мне нравятся очереди, потому что они более явные. Многие другие, например, Pool лучше, потому что они более волшебны; -)

Пример бассейна

Вот исполняемый прототип для вас. Это показывает один способ использования imap_unordered с Pool и chunksize, который не требует изменения каких-либо сигнатур функций. Конечно, вам придется подключить свой реальный код;-) Обратите внимание, что подход init_worker позволяет передавать "большинство" аргументов только один раз на процессор, а не один раз для каждого элемента в вашем degreeslist. Сокращение объема взаимодействия между процессами может иметь решающее значение для скорости.

import multiprocessing as mp

def init_worker(mps, fps, cut):
    global memorizedPaths, filepaths, cutoff
    global DG

    print "process initializing", mp.current_process()
    memorizedPaths, filepaths, cutoff = mps, fps, cut
    DG = 1##nx.read_gml("KeggComplete.gml", relabel = True)

def work(item):
    _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths)

def _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths):
    pass # print "doing " + str(item)

if __name__ == "__main__":
    m = mp.Manager()
    memorizedPaths = m.dict()
    filepaths = m.dict()
    cutoff = 1 ##
    # use all available CPUs
    p = mp.Pool(initializer=init_worker, initargs=(memorizedPaths,
                                                   filepaths,
                                                   cutoff))
    degreelist = range(100000) ##
    for _ in p.imap_unordered(work, degreelist, chunksize=500):
        pass
    p.close()
    p.join()

Я настоятельно рекомендую запустить это точно как есть, так что вы можете видеть, что он быстро вспыхивает. Затем добавьте к нему немного времени, чтобы увидеть, как это влияет на время. Например, просто добавив

   memorizedPaths[item] = item

to _all_simple_paths_graph() сильно замедляет его. Зачем? Потому что дик становится все больше и больше с каждым добавлением, и этот безопасный процесс должен быть синхронизирован (под обложками) среди всех процессов. Единицей синхронизации является "весь dict" - нет внутренней структуры, которую может использовать mp-машина для выполнения дополнительных обновлений для общего dict.

Если вы не можете позволить себе этот расход, вы не можете использовать Manager.dict() для этого. Возможности для умения изобилуют; -)