Обработка больших файлов в Python [1000 ГБ или более]
Допустим, у меня есть текстовый файл размером 1000 ГБ. Мне нужно найти, сколько раз фраза возникает в тексте.
Есть ли какой-нибудь более быстрый способ сделать это, чем тот, который я использую ниже?
Сколько потребуется для выполнения задачи.
phrase = "how fast it is"
count = 0
with open('bigfile.txt') as f:
for line in f:
count += line.count(phrase)
Если я прав, если у меня нет этого файла в памяти, я бы подождал, пока компьютер загрузит файл каждый раз, когда я делаю поиск, и это займет не менее 4000 секунд для 250 МБ/сек. диск и файл размером 10000 ГБ.
Ответы
Ответ 1
Я использовал file.read()
для чтения данных в кусках, в текущих примерах куски были размером 100 МБ, 500 МБ, 1 ГБ и 2 ГБ соответственно. Размер моего текстового файла - 2,1 ГБ.
Код:
from functools import partial
def read_in_chunks(size_in_bytes):
s = 'Lets say i have a text file of 1000 GB'
with open('data.txt', 'r+b') as f:
prev = ''
count = 0
f_read = partial(f.read, size_in_bytes)
for text in iter(f_read, ''):
if not text.endswith('\n'):
# if file contains a partial line at the end, then don't
# use it when counting the substring count.
text, rest = text.rsplit('\n', 1)
# pre-pend the previous partial line if any.
text = prev + text
prev = rest
else:
# if the text ends with a '\n' then simple pre-pend the
# previous partial line.
text = prev + text
prev = ''
count += text.count(s)
count += prev.count(s)
print count
Тайминги:
read_in_chunks(104857600)
$ time python so.py
10000000
real 0m1.649s
user 0m0.977s
sys 0m0.669s
read_in_chunks(524288000)
$ time python so.py
10000000
real 0m1.558s
user 0m0.893s
sys 0m0.646s
read_in_chunks(1073741824)
$ time python so.py
10000000
real 0m1.242s
user 0m0.689s
sys 0m0.549s
read_in_chunks(2147483648)
$ time python so.py
10000000
real 0m0.844s
user 0m0.415s
sys 0m0.408s
С другой стороны, простая версия цикла занимает около 6 секунд в моей системе:
def simple_loop():
s = 'Lets say i have a text file of 1000 GB'
with open('data.txt') as f:
print sum(line.count(s) for line in f)
$ time python so.py
10000000
real 0m5.993s
user 0m5.679s
sys 0m0.313s
Результаты @SlaterTyranus grep
версия в моем файле:
$ time grep -o 'Lets say i have a text file of 1000 GB' data.txt|wc -l
10000000
real 0m11.975s
user 0m11.779s
sys 0m0.568s
Результаты решения @woot :
$ time cat data.txt | parallel --block 10M --pipe grep -o 'Lets\ say\ i\ have\ a\ text\ file\ of\ 1000\ GB' | wc -l
10000000
real 0m5.955s
user 0m14.825s
sys 0m5.766s
Получил лучшее время, когда я использовал 100 МБ в качестве размера блока:
$ time cat data.txt | parallel --block 100M --pipe grep -o 'Lets\ say\ i\ have\ a\ text\ file\ of\ 1000\ GB' | wc -l
10000000
real 0m4.632s
user 0m13.466s
sys 0m3.290s
Результаты woot второе решение:
$ time python woot_thread.py # CHUNK_SIZE = 1073741824
10000000
real 0m1.006s
user 0m0.509s
sys 0m2.171s
$ time python woot_thread.py #CHUNK_SIZE = 2147483648
10000000
real 0m1.009s
user 0m0.495s
sys 0m2.144s
Системные характеристики: Core i5-4670, 7200 RPM HDD
Ответ 2
Вот попытка Python... Возможно, вам потребуется сыграть с THREADS и CHUNK_SIZE. Также это куча кода за короткое время, поэтому я, возможно, не думал обо всем. Я перекрываю свой буфер, но поймаю те, что находятся между ними, и я продлеваю последний фрагмент, чтобы включить оставшуюся часть файла.
import os
import threading
INPUTFILE ='bigfile.txt'
SEARCH_STRING='how fast it is'
THREADS = 8 # Set to 2 times number of cores, assuming hyperthreading
CHUNK_SIZE = 32768
FILESIZE = os.path.getsize(INPUTFILE)
SLICE_SIZE = FILESIZE / THREADS
class myThread (threading.Thread):
def __init__(self, filehandle, seekspot):
threading.Thread.__init__(self)
self.filehandle = filehandle
self.seekspot = seekspot
self.cnt = 0
def run(self):
self.filehandle.seek( self.seekspot )
p = self.seekspot
if FILESIZE - self.seekspot < 2 * SLICE_SIZE:
readend = FILESIZE
else:
readend = self.seekspot + SLICE_SIZE + len(SEARCH_STRING) - 1
overlap = ''
while p < readend:
if readend - p < CHUNK_SIZE:
buffer = overlap + self.filehandle.read(readend - p)
else:
buffer = overlap + self.filehandle.read(CHUNK_SIZE)
if buffer:
self.cnt += buffer.count(SEARCH_STRING)
overlap = buffer[len(buffer)-len(SEARCH_STRING)+1:]
p += CHUNK_SIZE
filehandles = []
threads = []
for fh_idx in range(0,THREADS):
filehandles.append(open(INPUTFILE,'rb'))
seekspot = fh_idx * SLICE_SIZE
threads.append(myThread(filehandles[fh_idx],seekspot ) )
threads[fh_idx].start()
totalcount = 0
for fh_idx in range(0,THREADS):
threads[fh_idx].join()
totalcount += threads[fh_idx].cnt
print totalcount
Ответ 3
Вы посмотрели на использование parallel/grep?
cat bigfile.txt | parallel --block 10M --pipe grep -o 'how\ fast\ it\ is' | wc -l
Ответ 4
Если бы вы рассматривали индексацию своего файла? Способ работы поисковой системы заключается в создании сопоставления слов с местоположением, которое они находятся в файле. Скажем, если у вас есть этот файл:
Foo bar baz dar. Dar bar haa.
Создается индекс, который выглядит так:
{
"foo": {0},
"bar": {4, 21},
"baz": {8},
"dar": {12, 17},
"haa": {25},
}
В O (1) можно найти индекс хэш-таблицы; так что он быстро ускоряется.
И кто-то ищет запрос "bar baz", вы сначала разбиваете запрос на его составные слова: [ "bar", "baz" ], после чего вы обнаруживаете {4, 21}, {8}; то вы используете это, чтобы выпрыгнуть прямо в те места, где может существовать запрошенный текст.
Есть также готовые решения для индексированных поисковых систем; например Solr или ElasticSearch.
Ответ 5
Предлагаем сделать это с помощью grep
вместо python. Будет быстрее, и вообще, если вы имеете дело с 1000 ГБ текста на своей локальной машине, вы сделали что-то не так, но все суждения в стороне, grep
поставляется с несколькими вариантами, которые облегчат вашу жизнь.
grep -o '<your_phrase>' bigfile.txt|wc -l
В частности, это будет подсчитывать количество строк, в которых появляется желаемая фраза. Это также должно учитывать несколько случаев в одной строке.
Если вам не нужно, чтобы вы могли сделать что-то вроде этого:
grep -c '<your_phrase>' bigfile.txt
Ответ 6
Вот третий, более длинный метод, который использует базу данных. База данных обязательно будет больше, чем текст. Я не уверен в том, что индексы оптимальны, и небольшая экономия пространства может возникнуть от игры с этим немного. (например, возможно, WORD и POS, WORD лучше, или, возможно, WORD, POS просто отлично, нужно немного поэкспериментировать).
Это может плохо работать при тестировании 200 OK, хотя из-за большого количества повторяющегося текста, но может работать с более уникальными данными.
Сначала создайте базу данных, сканируя слова и т.д.:
import sqlite3
import re
INPUT_FILENAME = 'bigfile.txt'
DB_NAME = 'words.db'
FLUSH_X_WORDS=10000
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS WORDS (
POS INTEGER
,WORD TEXT
,PRIMARY KEY( POS, WORD )
) WITHOUT ROWID
""")
cursor.execute("""
DROP INDEX IF EXISTS I_WORDS_WORD_POS
""")
cursor.execute("""
DROP INDEX IF EXISTS I_WORDS_POS_WORD
""")
cursor.execute("""
DELETE FROM WORDS
""")
conn.commit()
def flush_words(words):
for word in words.keys():
for pos in words[word]:
cursor.execute('INSERT INTO WORDS (POS, WORD) VALUES( ?, ? )', (pos, word.lower()) )
conn.commit()
words = dict()
pos = 0
recomp = re.compile('\w+')
with open(INPUT_FILENAME, 'r') as f:
for line in f:
for word in [x.lower() for x in recomp.findall(line) if x]:
pos += 1
if words.has_key(word):
words[word].append(pos)
else:
words[word] = [pos]
if pos % FLUSH_X_WORDS == 0:
flush_words(words)
words = dict()
if len(words) > 0:
flush_words(words)
words = dict()
cursor.execute("""
CREATE UNIQUE INDEX I_WORDS_WORD_POS ON WORDS ( WORD, POS )
""")
cursor.execute("""
CREATE UNIQUE INDEX I_WORDS_POS_WORD ON WORDS ( POS, WORD )
""")
cursor.execute("""
VACUUM
""")
cursor.execute("""
ANALYZE WORDS
""")
Затем выполните поиск базы данных, создав SQL:
import sqlite3
import re
SEARCH_PHRASE = 'how fast it is'
DB_NAME = 'words.db'
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
recomp = re.compile('\w+')
search_list = [x.lower() for x in recomp.findall(SEARCH_PHRASE) if x]
from_clause = 'FROM\n'
where_clause = 'WHERE\n'
num = 0
fsep = ' '
wsep = ' '
for word in search_list:
num += 1
from_clause += '{fsep}words w{num}\n'.format(fsep=fsep,num=num)
where_clause += "{wsep} w{num}.word = '{word}'\n".format(wsep=wsep, num=num, word=word)
if num > 1:
where_clause += " AND w{num}.pos = w{lastnum}.pos + 1\n".format(num=str(num),lastnum=str(num-1))
fsep = ' ,'
wsep = ' AND'
sql = """{select}{fromc}{where}""".format(select='SELECT COUNT(*)\n',fromc=from_clause, where=where_clause)
res = cursor.execute( sql )
print res.fetchone()[0]
Ответ 7
Мы говорим о простом подсчете конкретной подстроки в довольно большом потоке данных. Задача почти наверняка связана с I/O, но очень легко распараллеливается. Первый уровень - это скорость чтения; мы можем уменьшить количество чтения с помощью сжатия или распределить скорость передачи, сохраняя данные в нескольких местах. Тогда у нас есть сам поиск; поиск подстрок - это хорошо известная проблема, опять же ограничение ввода-вывода. Если набор данных происходит от одного диска в значительной степени любой оптимизация не является спорной, так как там не так, что диск бьется одно ядро в скорости.
Предполагая, что у нас есть куски, которые могут быть, например, отдельными блоками файла bzip2 (если мы используем поточный декомпрессор), полосами в RAID или распределенными узлами, мы можем многое получить от их обработки по отдельности. Выполняется поиск каждого фрагмента needle
, тогда суставы могут быть сформированы путем принятия len(needle)-1
с конца одного фрагмента и начала следующего и поиска в них.
Быстрый тест показывает, что машины с регулярными выражениями работают быстрее, чем обычный оператор in
:
>>> timeit.timeit("x.search(s)", "s='a'*500000; import re; x=re.compile('foobar')", number=20000)
17.146117210388184
>>> timeit.timeit("'foobar' in s", "s='a'*500000", number=20000)
24.263535976409912
>>> timeit.timeit("n in s", "s='a'*500000; n='foobar'", number=20000)
21.562405109405518
Еще один шаг оптимизации, который мы можем выполнить, учитывая, что мы имеем данные в файле, заключается в mmap вместо использования обычные операции чтения. Это позволяет операционной системе напрямую использовать дисковые буферы. Он также позволяет ядру удовлетворять несколько запросов на чтение в произвольном порядке без дополнительных системных вызовов, что позволяет использовать такие вещи, как базовый RAID, при работе в нескольких потоках.
Вот быстро протолкнул прототип. Очевидно, что некоторые вещи можно было бы улучшить, например, распределить куски, если у нас есть многоузловой кластер, выполняя проверку хвоста + головы, передавая один соседнему работнику (порядок, который неизвестен в этой реализации) вместо того, чтобы отправлять оба специального работника и реализации класса промежуточной очереди (pipe), а не для сопоставления семафоров. Вероятно, было бы также целесообразно перемещать рабочие потоки вне функции основного потока, поскольку основной поток продолжает изменять его локали.
from mmap import mmap, ALLOCATIONGRANULARITY, ACCESS_READ
from re import compile, escape
from threading import Semaphore, Thread
from collections import deque
def search(needle, filename):
# Might want chunksize=RAID block size, threads
chunksize=ALLOCATIONGRANULARITY*1024
threads=32
# Read chunk allowance
allocchunks=Semaphore(threads) # should maybe be larger
chunkqueue=deque() # Chunks mapped, read by workers
chunksready=Semaphore(0)
headtails=Semaphore(0) # edges between chunks into special worker
headtailq=deque()
sumq=deque() # worker final results
# Note: although we do push and pop at differing ends of the
# queues, we do not actually need to preserve ordering.
def headtailthread():
# Since head+tail is 2*len(needle)-2 long,
# it cannot contain more than one needle
htsum=0
matcher=compile(escape(needle))
heads={}
tails={}
while True:
headtails.acquire()
try:
pos,head,tail=headtailq.popleft()
except IndexError:
break # semaphore signaled without data, end of stream
try:
prevtail=tails.pop(pos-chunksize)
if matcher.search(prevtail+head):
htsum+=1
except KeyError:
heads[pos]=head
try:
nexthead=heads.pop(pos+chunksize)
if matcher.search(tail+nexthead):
htsum+=1
except KeyError:
tails[pos]=tail
# No need to check spill tail and head as they are shorter than needle
sumq.append(htsum)
def chunkthread():
threadsum=0
# escape special characters to achieve fixed string search
matcher=compile(escape(needle))
borderlen=len(needle)-1
while True:
chunksready.acquire()
try:
pos,chunk=chunkqueue.popleft()
except IndexError: # End of stream
break
# Let the re module do the heavy lifting
threadsum+=len(matcher.findall(chunk))
if borderlen>0:
# Extract the end pieces for checking borders
head=chunk[:borderlen]
tail=chunk[-borderlen:]
headtailq.append((pos,head,tail))
headtails.release()
chunk.close()
allocchunks.release() # let main thread allocate another chunk
sumq.append(threadsum)
with infile=open(filename,'rb'):
htt=Thread(target=headtailthread)
htt.start()
chunkthreads=[]
for i in range(threads):
t=Thread(target=chunkthread)
t.start()
chunkthreads.append(t)
pos=0
fileno=infile.fileno()
while True:
allocchunks.acquire()
chunk=mmap(fileno, chunksize, access=ACCESS_READ, offset=pos)
chunkqueue.append((pos,chunk))
chunksready.release()
pos+=chunksize
if pos>chunk.size(): # Last chunk of file?
break
# File ended, finish all chunks
for t in chunkthreads:
chunksready.release() # wake thread so it finishes
for t in chunkthreads:
t.join() # wait for thread to finish
headtails.release() # post event to finish border checker
htt.join()
# All threads finished, collect our sum
return sum(sumq)
if __name__=="__main__":
from sys import argv
print "Found string %d times"%search(*argv[1:])
Кроме того, изменение всего элемента для использования некоторой процедуры mapreduce (фрагменты карты, подсчеты, головы и хвосты, уменьшение путем суммирования счетчиков и проверка хвоста + частей головы) остается в качестве упражнения.
Изменить: поскольку кажется, что этот поиск будет повторяться с различными иглами, индекс будет намного быстрее, и он сможет пропустить поиск разделов, которые, как известно, не совпадают. Одна из возможностей заключается в создании карты, из которых блоки содержат любое появление различных n-граммов (с учетом границ блоков, позволяя ngram перекрываться в следующем); эти карты затем могут быть объединены, чтобы найти более сложные условия, прежде чем нужно будет загружать блоки исходных данных. Для этого есть базы данных; искать полнотекстовые поисковые системы.
Ответ 8
Я признаю, что grep будет быстрее. Я предполагаю, что этот файл представляет собой большой файл на основе строк.
Но вы могли бы сделать что-то подобное, если бы действительно хотели.
import os
import re
import mmap
fileName = 'bigfile.txt'
phrase = re.compile("how fast it is")
with open(fileName, 'r') as fHandle:
data = mmap.mmap(fHandle.fileno(), os.path.getsize(fileName), access=mmap.ACCESS_READ)
matches = re.match(phrase, data)
print('matches = {0}'.format(matches.group()))