Ответ 1
Слишком много накапливается здесь, чтобы обращаться к комментариям, поэтому, где mp
есть multiprocessing
:
mp.cpu_count()
должно возвращать количество процессоров. Но проверьте это. Некоторые платформы напуганы, и эту информацию получить не всегда легко. Python делает все возможное.
Если вы начнете 24 процесса, они сделают именно то, что вы им скажите;-) Похоже, mp.Pool()
будет наиболее удобным для вас. Вы передаете количество процессов, которые хотите создать, в его конструктор. mp.Pool(processes=None)
будет использовать mp.cpu_count()
для количества процессоров.
Затем вы можете использовать, например, .imap_unordered(...)
экземпляр Pool
, чтобы распространить ваш degreelist
на все процессы. Или, может быть, какой-то другой метод Pool
будет работать лучше для вас - эксперимент.
Если вы не можете bash проблему в представлении Pool
мира, вы можете вместо этого создать mp.Queue
, чтобы создать рабочую очередь, .put()
'узлы (или узлы узлов), чтобы уменьшить накладные расходы) для работы в основной программе и записи рабочих в .get()
рабочих элементов с этой очереди. Спросите, нужны ли вам примеры. Обратите внимание, что вам нужно поместить контрольные значения (по одному на процесс) в очередь, после всех "реальных" рабочих элементов, чтобы рабочие процессы могли проверять, чтобы сторож знал, когда они будут сделаны.
FYI, мне нравятся очереди, потому что они более явные. Многие другие, например, Pool
лучше, потому что они более волшебны; -)
Пример бассейна
Вот исполняемый прототип для вас. Это показывает один способ использования imap_unordered
с Pool
и chunksize
, который не требует изменения каких-либо сигнатур функций. Конечно, вам придется подключить свой реальный код;-) Обратите внимание, что подход init_worker
позволяет передавать "большинство" аргументов только один раз на процессор, а не один раз для каждого элемента в вашем degreeslist
. Сокращение объема взаимодействия между процессами может иметь решающее значение для скорости.
import multiprocessing as mp
def init_worker(mps, fps, cut):
global memorizedPaths, filepaths, cutoff
global DG
print "process initializing", mp.current_process()
memorizedPaths, filepaths, cutoff = mps, fps, cut
DG = 1##nx.read_gml("KeggComplete.gml", relabel = True)
def work(item):
_all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths)
def _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths):
pass # print "doing " + str(item)
if __name__ == "__main__":
m = mp.Manager()
memorizedPaths = m.dict()
filepaths = m.dict()
cutoff = 1 ##
# use all available CPUs
p = mp.Pool(initializer=init_worker, initargs=(memorizedPaths,
filepaths,
cutoff))
degreelist = range(100000) ##
for _ in p.imap_unordered(work, degreelist, chunksize=500):
pass
p.close()
p.join()
Я настоятельно рекомендую запустить это точно как есть, так что вы можете видеть, что он быстро вспыхивает. Затем добавьте к нему немного времени, чтобы увидеть, как это влияет на время. Например, просто добавив
memorizedPaths[item] = item
to _all_simple_paths_graph()
сильно замедляет его. Зачем? Потому что дик становится все больше и больше с каждым добавлением, и этот безопасный процесс должен быть синхронизирован (под обложками) среди всех процессов. Единицей синхронизации является "весь dict" - нет внутренней структуры, которую может использовать mp-машина для выполнения дополнительных обновлений для общего dict.
Если вы не можете позволить себе этот расход, вы не можете использовать Manager.dict()
для этого. Возможности для умения изобилуют; -)