Почему в python map() и multiprocessing.Pool.map() получили разные ответы?
У меня была странная проблема. У меня есть файл формата:
START
1
2
STOP
lllllllll
START
3
5
6
STOP
и я хочу читать строки между START
и STOP
в качестве блоков и использовать my_f
для обработки каждого блока.
def block_generator(file):
with open(file) as lines:
for line in lines:
if line == 'START':
block=itertools.takewhile(lambda x:x!='STOP',lines)
yield block
и в моей основной функции я попытался использовать map()
, чтобы выполнить эту работу. Это сработало.
blocks=block_generator(file)
map(my_f,blocks)
действительно даст мне то, что я хочу. Но когда я пробовал то же самое с multiprocessing.Pool.map()
, он дал мне ошибку, сказав, что takewhile() хочет взять 2 аргумента, было дано 0.
blocks=block_generator(file)
p=multiprocessing.Pool(4)
p.map(my_f,blocks)
Это ошибка?
- Файл содержит более 1000000 блоков, каждый из которых имеет менее 100 строк.
- Я принимаю форму ответа untubu.
- Но, может быть, я просто разделил файл и использовал n экземпляр моего оригинального script без многопроцессорной обработки для их обработки, затем сгруппировал результаты вместе. Таким образом, вы никогда не ошибетесь, пока script работает с небольшим файлом.
Ответы
Ответ 1
Как насчет:
import itertools
def grouper(n, iterable, fillvalue=None):
# Source: http://docs.python.org/library/itertools.html#recipes
"grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
return itertools.izip_longest(*[iter(iterable)]*n,fillvalue=fillvalue)
def block_generator(file):
with open(file) as lines:
for line in lines:
if line == 'START':
block=list(itertools.takewhile(lambda x:x!='STOP',lines))
yield block
blocks=block_generator(file)
p=multiprocessing.Pool(4)
for chunk in grouper(100,blocks,fillvalue=''):
p.map(my_f,chunk)
Использование grouper
ограничит количество файлов, потребляемых p.map
. Таким образом, весь файл не обязательно нужно считывать в память (загружать в очередь задач) сразу.
Я утверждаю, что когда вы вызываете p.map(func,iterator)
, весь итератор потребляется немедленно, чтобы заполнить очередь задач. Затем работники пула получают задания из очереди и работают на рабочих местах одновременно.
Если вы заглянете внутрь pool.py и проследите через определения, вы увидите
поток _handle_tasks
получает элементы из self._taskqueue
и перечисляет это сразу:
for i, task in enumerate(taskseq):
...
put(task)
Вывод: итератор, переданный в p.map
, сразу потребляется. Нет ожиданий завершения одной задачи до того, как будет получена следующая задача из очереди.
В качестве дополнительного подтверждения, если вы запустите это:
демонстрационный код:
import multiprocessing as mp
import time
import logging
def foo(x):
time.sleep(1)
return x*x
def blocks():
for x in range(1000):
if x%100==0:
logger.info('Got here')
yield x
logger=mp.log_to_stderr(logging.DEBUG)
logger.setLevel(logging.DEBUG)
pool=mp.Pool()
print pool.map(foo, blocks())
Вы увидите сообщение Got here
, напечатанное 10 раз почти сразу, а затем длинную паузу из-за вызова time.sleep(1)
в foo
. Это явно показывает, что итератор полностью потребляется задолго до того, как процессы пула приближаются к завершению задач.
Ответ 2
В принципе, когда вы перебираете такой файл, как вы, каждый раз, когда вы читаете новую строку из файла, вы перемещаете указатель файла вперед на одну строку.
Итак, когда вы делаете
block=itertools.takewhile(lambda x:x!='STOP',lines)
каждый раз, когда итератор, возвращаемый takewhile
, получает новый элемент из lines
, он перемещает указатель файла.
Как правило, плохо продвигать итератор, который вы уже зацикливаете в цикле for
. Тем не менее, цикл for
временно приостанавливается на каждом yield
, а map
исчерпывает takewhile
, прежде чем продолжить цикл for
, чтобы вы получили желаемое поведение.
Когда вы выполняете цикл for
и takewhile
одновременно, указатель файла быстро перемещается в конец, и вы получаете сообщение об ошибке.
Попробуйте это вместо этого, оно должно быть быстрее, чем обертка takewhile
в list
:
from contextlib import closing
from itertools import repeat
def block_generator(filename):
with open(filename) as infile:
for pos in (infile.tell() for line in infile if line == 'START'):
yield pos
def my_f_wrapper(pos, filename):
with open(filename) as infile:
infile.seek(pos)
block=itertools.takewhile(lambda x:x!='STOP', infile)
my_f(block)
blocks = block_generator(filename)
p.imap(my_f_wrapper, blocks, repeat(filename))
В принципе, вы хотите, чтобы каждый my_f
работал независимо от файла, поэтому вам нужно открыть файл независимо для каждого.
Я не могу придумать способ, который не требует повторения итерации файла дважды, один раз в цикле for
и один раз с помощью takewhile
все вместе, при одновременной обработке файла параллельно. В исходной версии takewhile
улучшил указатель файла для цикла for
, поэтому он был очень эффективным.
Если вы не повторяли строки, а просто байты, я бы рекомендовал использовать mmap, но это сделало бы вещи намного сложнее, если вы работаете со строками текста.
Изменить: Альтернативой может быть block_generator
пройти через файл и найти все позиции START
и STOP
, а затем передать их парами в оболочку. Таким образом, оболочке не пришлось бы сравнивать строки с STOP
, просто нужно было бы использовать tell()
в файле, чтобы убедиться, что он не находится в STOP
. Я не уверен, будет ли это быстрее.