Ответ 1
Per комментарии,
мы хотим, чтобы каждый процесс работал на куске в 10000 строк. Это не слишком сложно
делать; см. рецепт iter/islice
ниже. Однако проблема с использованием
pool.map(worker, ten_thousand_row_chunks)
заключается в том, что pool.map
будет пытаться поместить все куски в очередь задач
сразу. Если для этого требуется больше памяти, чем доступно, вы получаете
MemoryError
. (Примечание: pool.imap
страдает от одной и той же проблемы.)
Поэтому вместо этого нужно называть pool.map
итеративно на куски каждого фрагмента.
import itertools as IT
import multiprocessing as mp
import csv
def worker(chunk):
return len(chunk)
def main():
# num_procs is the number of workers in the pool
num_procs = mp.cpu_count()
# chunksize is the number of lines in a chunk
chunksize = 10**5
pool = mp.Pool(num_procs)
largefile = 'Counseling.csv'
results = []
with open(largefile, 'rb') as f:
reader = csv.reader(f)
for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []):
chunk = iter(chunk)
pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), []))
result = pool.map(worker, pieces)
results.extend(result)
print(results)
pool.close()
pool.join()
main()
Каждый chunk
будет состоять из chunksize*num_procs
строк из файла.
Этого достаточно для того, чтобы все рабочие в пуле работали, но не слишком велики, чтобы вызвать MemoryError - при условии, что chunksize
не слишком большой.
Каждый chunk
затем разбивается на куски, причем каждая часть состоит из
chunksize
строки из файла. Эти части затем отправляются на pool.map
.
Как работает iter(lambda: list(IT.islice(iterator, chunksize)), [])
:
Это идиома для группировки итератора в куски длины chunksize. Посмотрим, как это работает на примере:
In [111]: iterator = iter(range(10))
Обратите внимание, что каждый раз, когда вызывается IT.islice(iterator, 3)
, новый фрагмент из 3 элементов
отрезается от итератора:
In [112]: list(IT.islice(iterator, 3))
Out[112]: [0, 1, 2]
In [113]: list(IT.islice(iterator, 3))
Out[113]: [3, 4, 5]
In [114]: list(IT.islice(iterator, 3))
Out[114]: [6, 7, 8]
Если на итераторе осталось меньше 3 элементов, возвращается только то, что осталось:
In [115]: list(IT.islice(iterator, 3))
Out[115]: [9]
И если вы снова вызовете его, вы получите пустой список:
In [116]: list(IT.islice(iterable, 3))
Out[116]: []
lambda: list(IT.islice(iterator, chunksize))
- это функция, которая возвращает list(IT.islice(iterator, chunksize))
при вызове. Это "однострочный", который эквивалентен
def func():
return list(IT.islice(iterator, chunksize))
Наконец, iter(callable, sentinel)
возвращает другой итератор. Значения, полученные этим итератором, являются значениями, возвращаемыми вызываемым. Он продолжает давать значения, пока вызываемый не возвращает значение, равное дозорному. Итак,
iter(lambda: list(IT.islice(iterator, chunksize)), [])
будет продолжать возвращать значения list(IT.islice(iterator, chunksize))
, пока это значение не будет пустым:
In [121]: iterator = iter(range(10))
In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), []))
Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]