Параллельная обработка большого .csv файла в Python
Я обрабатываю большие CSV файлы (порядка нескольких ГБ с 10 М строк) с помощью Python script.
Файлы имеют разные длины строк и не могут быть полностью загружены в память для анализа.
Каждая строка обрабатывается отдельно функцией в моем script. Для анализа одного файла требуется около 20 минут, и, похоже, скорость доступа к диску не является проблемой, а скорее вызовом обработки/функции.
Код выглядит примерно так (очень просто). Фактический код использует структуру класса, но это похоже:
csvReader = csv.reader(open("file","r")
for row in csvReader:
handleRow(row, dataStructure)
Учитывая, что для вычисления требуется общая структура данных, какой был бы лучший способ параллельного анализа анализа на Python с использованием нескольких ядер?
В общем, как я могу прочитать сразу несколько строк из .csv в Python для передачи потоку/процессу? Цикл с for
по строкам не очень эффективен.
Спасибо!
Ответы
Ответ 1
Попробуйте сравнительный анализ вашего файла и разбор каждой строки CSV, но ничего не делая с ним. Вы исключили доступ к диску, но вам все равно нужно выяснить, является ли синтаксический анализ CSV медленным, или если ваш собственный код является медленным.
Если это синтаксический анализ CSV медленный, вы можете застрять, потому что я не думаю, что есть способ перепрыгнуть в середину CSV файла без сканирования до этой точки.
Если это ваш собственный код, вы можете иметь один поток, читающий CSV файл, и вывод строк в очередь, а затем несколько потоков, обрабатывающих строки из этой очереди. Но не беспокойтесь об этом решении, если сам анализ CSV делает его медленным.
Ответ 2
Из-за GIL, потоки Python не будут ускорять вычисления, которые связаны с процессором, как это может быть с привязкой IO.
Вместо этого взгляните на модуль многопроцессорности, который может запускать ваш код на нескольких процессорах параллельно.
Ответ 3
Это может быть слишком поздно, но для будущих пользователей я все равно отправлю. Другой плакат упоминается с использованием многопроцессорности. Я могу ручаться за это и могу зайти подробнее. Python мы ежедневно обрабатываем файлы в сотнях MB/несколько GB каждый день. Так что это определенно зависит от задачи. Некоторые из файлов, с которыми мы имеем дело, - это не CSV, поэтому синтаксический анализ может быть довольно сложным и занимает больше времени, чем доступ к диску. Тем не менее, методология одинакова независимо от типа файла.
Вы можете обрабатывать фрагменты больших файлов одновременно. Здесь псевдо-код того, как мы это делаем:
import os, multiprocessing as mp
# process file function
def processfile(filename, start=0, stop=0):
if start == 0 and stop == 0:
... process entire file...
else:
with open(file, 'r') as fh:
fh.seek(start)
lines = fh.readlines(stop - start)
... process these lines ...
return results
if __name__ == "__main__":
# get file size and set chuck size
filesize = os.path.getsize(filename)
split_size = 100*1024*1024
# determine if it needs to be split
if filesize > split_size:
# create pool, initialize chunk start location (cursor)
pool = mp.Pool(cpu_count)
cursor = 0
results = []
with open(file, 'r') as fh:
# for every chunk in the file...
for chunk in xrange(filesize // split_size):
# determine where the chunk ends, is it the last one?
if cursor + split_size > filesize:
end = filesize
else:
end = cursor + split_size
# seek to end of chunk and read next line to ensure you
# pass entire lines to the processfile function
fh.seek(end)
fh.readline()
# get current file location
end = fh.tell()
# add chunk to process pool, save reference to get results
proc = pool.apply_async(processfile, args=[filename, cursor, end])
results.append(proc)
# setup next chunk
cursor = end
# close and wait for pool to finish
pool.close()
pool.join()
# iterate through results
for proc in results:
processfile_result = proc.get()
else:
...process normally...
Как я уже сказал, это только псевдокод. Это должно заставить каждого начать, кто должен что-то сделать. У меня нет кода передо мной, просто делаю это из памяти.
Но мы получили больше, чем 2-кратное ускорение от этого при первом запуске без тонкой настройки. Вы можете точно настроить количество процессов в пуле и увеличить объем кусков, чтобы получить еще большую скорость в зависимости от вашей настройки. Если у вас есть несколько файлов, как и мы, создайте пул для параллельного чтения нескольких файлов. Просто будьте осторожны, чтобы перегрузить коробку слишком многими процессами.
Примечание. Вы должны поместить его в блок "если главный", чтобы гарантировать, что бесконечные процессы не будут созданы.
Ответ 4
Если строки полностью независимы, просто разделите входной файл на столько файлов, сколько у вас есть. После этого вы можете запускать столько экземпляров процесса, сколько входных файлов, которые у вас есть. Эти экземпляры, поскольку они являются совершенно разными процессами, не будут связаны проблемами GIL.
Ответ 5
Если вы используете zmq и средний человек DEALER, вы сможете распространять обработку строк не только на процессоры вашего компьютера, но и через сеть до любого количества процессов по мере необходимости. Это по существу гарантирует, что вы нажмете ограничение ввода-вывода против предела ЦП:)