Использование многопроцессорного пула работников
У меня есть следующий код, написанный для создания моего ленивого второго ядра ЦП. То, что делает код в основном, сначала находит нужные "морские" файлы в иерархии каталогов, а затем выполняет набор внешних скриптов для обработки этих двоичных "морских" файлов для получения от 50 до 100 текстовых и двоичных файлов в количестве. Поскольку название вопроса предполагает параллельный способ увеличить скорость обработки.
Этот вопрос проистекает из долгого обсуждения, которое у нас было в списке пользователей IPython под названием "" Не удается запустить ipcluster". Начиная с моих экспериментов по функциям параллельной обработки IPython.
Проблема в том, что я не могу правильно запустить этот код. Если в папках, содержащих "морские" файлы, есть только "морские" файлы, script завершает свое выполнение без полного выполнения внешних script запусков. (Скажем, у меня есть 30-50 внешних скриптов для запуска, но моя многопроцессорность включена script исчерпывается только после выполнения первого script в этой внешней цепочке script.) Интересно, если я запустил этот script на уже обработанной (который является "морским" файлом, обработанным заранее, а выходные файлы уже находятся в этой папке), тогда он запускается, но на этот раз я получаю ускорения в диапазоне от 2,4 до 2,7X относительно линейных таймингов обработки. Это не очень ожидаемо, так как у меня только есть процессор Core 2 Duo 2.5 Ghz на моем ноутбуке. Хотя у меня есть GPU с процессором CUDA, он не имеет ничего общего с моей текущей параллельной вычислительной борьбой:)
Как вы думаете, может быть источником этой проблемы?
Спасибо за все комментарии и предложения.
#!/usr/bin/env python
from multiprocessing import Pool
from subprocess import call
import os
def find_sea_files():
file_list, path_list = [], []
init = os.getcwd()
for root, dirs, files in os.walk('.'):
dirs.sort()
for file in files:
if file.endswith('.sea'):
file_list.append(file)
os.chdir(root)
path_list.append(os.getcwd())
os.chdir(init)
return file_list, path_list
def process_all(pf):
os.chdir(pf[0])
call(['postprocessing_saudi', pf[1]])
if __name__ == '__main__':
pool = Pool(processes=2) # start 2 worker processes
files, paths = find_sea_files()
pathfile = [[paths[i],files[i]] for i in range(len(files))]
pool.map(process_all, pathfile)
Ответы
Ответ 1
Я бы начал лучше понимать, что происходит с рабочим процессом. Многопроцессорный модуль поставляется с протоколированием для своих подпроцессов, если вам нужно. Поскольку вы упростили код для сужения проблемы, я бы просто отлаживал несколько инструкций печати, например (или вы можете PrettyPrint массива pf):
def process_all(pf):
print "PID: ", os.getpid()
print "Script Dir: ", pf[0]
print "Script: ", pf[1]
os.chdir(pf[0])
call(['postprocessing_saudi', pf[1]])
if __name__ == '__main__':
pool = Pool(processes=2)
files, paths = find_sea_files()
pathfile = [[paths[i],files[i]] for i in range(len(files))]
pool.map(process_all, pathfile, 1) # Ensure the chunk size is 1
pool.close()
pool.join()
Версия Python, которую я выполнил с помощью 2.6.4.
Ответ 2
Есть несколько вещей, о которых я могу думать:
1) Вы распечатали файлы путей? Вы уверены, что все они правильно сгенерированы?
a) Я спрашиваю, как ваш os.walk немного интересен; dirs.sort() должен быть в порядке, но кажется совершенно необработанным. os.chdir() вообще не следует использовать; восстановление должно быть в порядке, но в целом вы должны просто добавить root в init.
2) Я видел, что многопроцессорность на python2.6 имеет проблемы, порождающие подпоры из пулов. (Я специально использовал script многопроцессорную обработку для подпроцессов. Эти подпроцессы тогда не могли правильно использовать многопроцессорность (пул заблокирован)). Попробуйте python2.5 w/mulitprocessing backport.
3) Попробуйте picloud модуль cloud.mp(который переносит многопроцессорность, но обрабатывает пулы несколько иначе) и посмотрите, работает ли это,
Вы бы сделали
cloud.mp.join(cloud.mp.map(process_all, pathfile))
(Отказ от ответственности: я являюсь одним из разработчиков PiCloud)