Самый быстрый способ обработки большого файла?
У меня есть несколько файлов по 3 ГБ с разделителями табуляции. В каждом файле 20 миллионов строк. Все строки должны быть обработаны независимо друг от друга, никаких связей между двумя строками нет. Мой вопрос в том, что будет быстрее А. Чтение построчно с помощью:
with open() as infile:
for line in infile:
Или B. Чтение файла в память кусками и его обработка, скажем, 250 МБ за раз?
Обработка не очень сложно, я просто захватывая значение в column1 для List1
, column2 к List2
и т.д. Может нужно добавить несколько значений столбцов вместе.
Я использую Python 2.7 на Linux-коробке, которая имеет 30 ГБ памяти. ASCII Text.
Есть ли способ ускорить процесс параллельно? Прямо сейчас я использую первый метод, и процесс очень медленный. CSVReader
ли использование какого- CSVReader
модуля CSVReader
? Мне не нужно делать это на python, приветствуются любые другие идеи использования языка или базы данных.
Ответы
Ответ 1
Похоже, что ваш код связан с I/O. Это означает, что многопроцессорность не поможет - если вы тратите 90% своего времени на чтение с диска, наличие дополнительных 7 процессов, ожидающих следующего чтения, ничего не поможет.
И, хотя использование модуля чтения CSV (будь то stdlib csv
или что-то вроде NumPy или Pandas), может быть хорошей идеей для простоты, вряд ли это сильно повлияет на производительность.
Тем не менее, стоит проверить, что вы действительно связаны с I/O, а не просто гадать. Запустите свою программу и посмотрите, приближается ли ваш процессор к 0% или близко к 100% или к ядру. Сделайте то, что предложил Амадан в комментарии, и запустите свою программу всего за pass
для обработки и посмотрите, отменит ли это 5% времени или 70%. Вы даже можете попробовать сравнить с циклом над os.open
и os.read(1024*1024)
или что-то еще, и посмотреть, будет ли это быстрее.
Поскольку вы используете Python 2.x, Python полагается на библиотеку C stdio, чтобы угадать, сколько буферов за раз, поэтому может стоить заставлять его буферизировать больше. Самый простой способ сделать это - использовать readlines(bufsize)
для некоторого большого bufsize
. (Вы можете попробовать разные цифры и измерить их, чтобы увидеть, где находится пик. По моему опыту, обычно что-либо из 64K-8MB примерно одинаково, но в зависимости от вашей системы, которая может быть другой, особенно если вы, например, читаете отключить сетевую файловую систему с большой пропускной способностью, но ужасную задержку, которая увеличивает пропускную способность и задержку реального физического диска и кэширование ОС).
Итак, например:
bufsize = 65536
with open(path) as infile:
while True:
lines = infile.readlines(bufsize)
if not lines:
break
for line in lines:
process(line)
Между тем, если вы используете 64-битную систему, вы можете попробовать использовать mmap
вместо того, чтобы читать файл в первое место. Это, конечно же, не гарантирует, что будет лучше, но может быть лучше, в зависимости от вашей системы. Например:
with open(path) as infile:
m = mmap.mmap(infile, 0, access=mmap.ACCESS_READ)
A Python mmap
является своего рода странным объектом - он действует как str
и как a file
одновременно, так что вы можете, например, вручную перебирать сканирование для строк новой строки или вызывать call readline
на нем, как на файл. Обе из них потребуют большей обработки от Python, чем повторение файла в виде строк или выполнение пакета readlines
(поскольку цикл, который будет в C, теперь находится в чистом Python... хотя, возможно, вы можете обойти это с помощью re
или с помощью простое расширение Cython?)... но преимущество ввода-вывода ОС, зная, что вы делаете с отображением, может заглушить недостаток процессора.
К сожалению, Python не раскрывает вызов madvise
, который вы использовали бы для настройки, пытаясь оптимизировать это в C (например, явно устанавливая MADV_SEQUENTIAL
вместо того, чтобы сделать ядро догадываться или форсировать прозрачные огромные страницы), - но вы действительно можете ctypes
выполнить функцию из libc
.
Ответ 2
Я знаю, что этот вопрос старый; но я хотел сделать то же самое, я создал простую структуру, которая помогает вам читать и обрабатывать большой файл параллельно. Оставляя то, что я пытался ответить.
Это код, я приведу пример в конце
def chunkify_file(fname, size=1024*1024*1000, skiplines=-1):
"""
function to divide a large text file into chunks each having size ~= size so that the chunks are line aligned
Params :
fname : path to the file to be chunked
size : size of each chink is ~> this
skiplines : number of lines in the begining to skip, -1 means don't skip any lines
Returns :
start and end position of chunks in Bytes
"""
chunks = []
fileEnd = os.path.getsize(fname)
with open(fname, "rb") as f:
if(skiplines > 0):
for i in range(skiplines):
f.readline()
chunkEnd = f.tell()
count = 0
while True:
chunkStart = chunkEnd
f.seek(f.tell() + size, os.SEEK_SET)
f.readline() # make this chunk line aligned
chunkEnd = f.tell()
chunks.append((chunkStart, chunkEnd - chunkStart, fname))
count+=1
if chunkEnd > fileEnd:
break
return chunks
def parallel_apply_line_by_line_chunk(chunk_data):
"""
function to apply a function to each line in a chunk
Params :
chunk_data : the data for this chunk
Returns :
list of the non-None results for this chunk
"""
chunk_start, chunk_size, file_path, func_apply = chunk_data[:4]
func_args = chunk_data[4:]
t1 = time.time()
chunk_res = []
with open(file_path, "rb") as f:
f.seek(chunk_start)
cont = f.read(chunk_size).decode(encoding='utf-8')
lines = cont.splitlines()
for i,line in enumerate(lines):
ret = func_apply(line, *func_args)
if(ret != None):
chunk_res.append(ret)
return chunk_res
def parallel_apply_line_by_line(input_file_path, chunk_size_factor, num_procs, skiplines, func_apply, func_args, fout=None):
"""
function to apply a supplied function line by line in parallel
Params :
input_file_path : path to input file
chunk_size_factor : size of 1 chunk in MB
num_procs : number of parallel processes to spawn, max used is num of available cores - 1
skiplines : number of top lines to skip while processing
func_apply : a function which expects a line and outputs None for lines we don't want processed
func_args : arguments to function func_apply
fout : do we want to output the processed lines to a file
Returns :
list of the non-None results obtained be processing each line
"""
num_parallel = min(num_procs, psutil.cpu_count()) - 1
jobs = chunkify_file(input_file_path, 1024 * 1024 * chunk_size_factor, skiplines)
jobs = [list(x) + [func_apply] + func_args for x in jobs]
print("Starting the parallel pool for {} jobs ".format(len(jobs)))
lines_counter = 0
pool = mp.Pool(num_parallel, maxtasksperchild=1000) # maxtaskperchild - if not supplied some weird happend and memory blows as the processes keep on lingering
outputs = []
for i in range(0, len(jobs), num_parallel):
print("Chunk start = ", i)
t1 = time.time()
chunk_outputs = pool.map(parallel_apply_line_by_line_chunk, jobs[i : i + num_parallel])
for i, subl in enumerate(chunk_outputs):
for x in subl:
if(fout != None):
print(x, file=fout)
else:
outputs.append(x)
lines_counter += 1
del(chunk_outputs)
gc.collect()
print("All Done in time ", time.time() - t1)
print("Total lines we have = {}".format(lines_counter))
pool.close()
pool.terminate()
return outputs
Скажем, например, у меня есть файл, в котором я хочу посчитать количество слов в каждой строке, тогда обработка каждой строки будет выглядеть как
def count_words_line(line):
return len(line.strip().split())
а затем вызвать функцию как:
parallel_apply_line_by_line(input_file_path, 100, 8, 0, count_words_line, [], fout=None)
Используя это, я получаю скорость в ~ 8 раз выше по сравнению с обычным построчным чтением в файле-образце размером ~ 20 ГБ, в котором я выполняю некоторую умеренно сложную обработку каждой строки.