Обработка больших файлов в Python [1000 ГБ или более]

Допустим, у меня есть текстовый файл размером 1000 ГБ. Мне нужно найти, сколько раз фраза возникает в тексте.

Есть ли какой-нибудь более быстрый способ сделать это, чем тот, который я использую ниже? Сколько потребуется для выполнения задачи.

phrase = "how fast it is"
count = 0
with open('bigfile.txt') as f:
    for line in f:
        count += line.count(phrase)

Если я прав, если у меня нет этого файла в памяти, я бы подождал, пока компьютер загрузит файл каждый раз, когда я делаю поиск, и это займет не менее 4000 секунд для 250 МБ/сек. диск и файл размером 10000 ГБ.

Ответы

Ответ 1

Я использовал file.read() для чтения данных в кусках, в текущих примерах куски были размером 100 МБ, 500 МБ, 1 ГБ и 2 ГБ соответственно. Размер моего текстового файла - 2,1 ГБ.

Код:

 from functools import partial

 def read_in_chunks(size_in_bytes):

    s = 'Lets say i have a text file of 1000 GB'
    with open('data.txt', 'r+b') as f:
        prev = ''
        count = 0
        f_read  = partial(f.read, size_in_bytes)
        for text in iter(f_read, ''):
            if not text.endswith('\n'):
                # if file contains a partial line at the end, then don't
                # use it when counting the substring count. 
                text, rest = text.rsplit('\n', 1)
                # pre-pend the previous partial line if any.
                text =  prev + text
                prev = rest
            else:
                # if the text ends with a '\n' then simple pre-pend the
                # previous partial line. 
                text =  prev + text
                prev = ''
            count += text.count(s)
        count += prev.count(s)
        print count

Тайминги:

read_in_chunks(104857600)
$ time python so.py
10000000

real    0m1.649s
user    0m0.977s
sys     0m0.669s

read_in_chunks(524288000)
$ time python so.py
10000000

real    0m1.558s
user    0m0.893s
sys     0m0.646s

read_in_chunks(1073741824)
$ time python so.py
10000000

real    0m1.242s
user    0m0.689s
sys     0m0.549s


read_in_chunks(2147483648)
$ time python so.py
10000000

real    0m0.844s
user    0m0.415s
sys     0m0.408s

С другой стороны, простая версия цикла занимает около 6 секунд в моей системе:

def simple_loop():

    s = 'Lets say i have a text file of 1000 GB'
    with open('data.txt') as f:
        print sum(line.count(s) for line in f)

$ time python so.py
10000000

real    0m5.993s
user    0m5.679s
sys     0m0.313s

Результаты @SlaterTyranus grep версия в моем файле:

$ time grep -o 'Lets say i have a text file of 1000 GB' data.txt|wc -l
10000000

real    0m11.975s
user    0m11.779s
sys     0m0.568s

Результаты решения @woot :

$ time cat data.txt | parallel --block 10M --pipe grep -o 'Lets\ say\ i\ have\ a\ text\ file\ of\ 1000\ GB' | wc -l
10000000

real    0m5.955s
user    0m14.825s
sys     0m5.766s

Получил лучшее время, когда я использовал 100 МБ в качестве размера блока:

$ time cat data.txt | parallel --block 100M --pipe grep -o 'Lets\ say\ i\ have\ a\ text\ file\ of\ 1000\ GB' | wc -l
10000000

real    0m4.632s
user    0m13.466s
sys     0m3.290s

Результаты woot второе решение:

$ time python woot_thread.py # CHUNK_SIZE = 1073741824
10000000

real    0m1.006s
user    0m0.509s
sys     0m2.171s
$ time python woot_thread.py  #CHUNK_SIZE = 2147483648
10000000

real    0m1.009s
user    0m0.495s
sys     0m2.144s

Системные характеристики: Core i5-4670, 7200 RPM HDD

Ответ 2

Вот попытка Python... Возможно, вам потребуется сыграть с THREADS и CHUNK_SIZE. Также это куча кода за короткое время, поэтому я, возможно, не думал обо всем. Я перекрываю свой буфер, но поймаю те, что находятся между ними, и я продлеваю последний фрагмент, чтобы включить оставшуюся часть файла.

import os
import threading

INPUTFILE ='bigfile.txt'
SEARCH_STRING='how fast it is'
THREADS = 8  # Set to 2 times number of cores, assuming hyperthreading
CHUNK_SIZE = 32768

FILESIZE = os.path.getsize(INPUTFILE)
SLICE_SIZE = FILESIZE / THREADS



class myThread (threading.Thread):
    def __init__(self, filehandle, seekspot):
        threading.Thread.__init__(self)
        self.filehandle = filehandle
        self.seekspot = seekspot
        self.cnt = 0
    def run(self):
        self.filehandle.seek( self.seekspot )

        p = self.seekspot
        if FILESIZE - self.seekspot < 2 * SLICE_SIZE:
            readend = FILESIZE
        else: 
            readend = self.seekspot + SLICE_SIZE + len(SEARCH_STRING) - 1
        overlap = ''
        while p < readend:
            if readend - p < CHUNK_SIZE:
                buffer = overlap + self.filehandle.read(readend - p)
            else:
                buffer = overlap + self.filehandle.read(CHUNK_SIZE)
            if buffer:
                self.cnt += buffer.count(SEARCH_STRING)
            overlap = buffer[len(buffer)-len(SEARCH_STRING)+1:]
            p += CHUNK_SIZE

filehandles = []
threads = []
for fh_idx in range(0,THREADS):
    filehandles.append(open(INPUTFILE,'rb'))
    seekspot = fh_idx * SLICE_SIZE
    threads.append(myThread(filehandles[fh_idx],seekspot ) )
    threads[fh_idx].start()

totalcount = 0 
for fh_idx in range(0,THREADS):
    threads[fh_idx].join()
    totalcount += threads[fh_idx].cnt

print totalcount

Ответ 3

Вы посмотрели на использование parallel/grep?

cat bigfile.txt | parallel --block 10M --pipe grep -o 'how\ fast\ it\ is' | wc -l

Ответ 4

Если бы вы рассматривали индексацию своего файла? Способ работы поисковой системы заключается в создании сопоставления слов с местоположением, которое они находятся в файле. Скажем, если у вас есть этот файл:

Foo bar baz dar. Dar bar haa.

Создается индекс, который выглядит так:

{
    "foo": {0},
    "bar": {4, 21},
    "baz": {8},
    "dar": {12, 17},
    "haa": {25},
}

В O (1) можно найти индекс хэш-таблицы; так что он быстро ускоряется.

И кто-то ищет запрос "bar baz", вы сначала разбиваете запрос на его составные слова: [ "bar", "baz" ], после чего вы обнаруживаете {4, 21}, {8}; то вы используете это, чтобы выпрыгнуть прямо в те места, где может существовать запрошенный текст.

Есть также готовые решения для индексированных поисковых систем; например Solr или ElasticSearch.

Ответ 5

Предлагаем сделать это с помощью grep вместо python. Будет быстрее, и вообще, если вы имеете дело с 1000 ГБ текста на своей локальной машине, вы сделали что-то не так, но все суждения в стороне, grep поставляется с несколькими вариантами, которые облегчат вашу жизнь.

grep -o '<your_phrase>' bigfile.txt|wc -l

В частности, это будет подсчитывать количество строк, в которых появляется желаемая фраза. Это также должно учитывать несколько случаев в одной строке.

Если вам не нужно, чтобы вы могли сделать что-то вроде этого:

grep -c '<your_phrase>' bigfile.txt

Ответ 6

Вот третий, более длинный метод, который использует базу данных. База данных обязательно будет больше, чем текст. Я не уверен в том, что индексы оптимальны, и небольшая экономия пространства может возникнуть от игры с этим немного. (например, возможно, WORD и POS, WORD лучше, или, возможно, WORD, POS просто отлично, нужно немного поэкспериментировать).

Это может плохо работать при тестировании 200 OK, хотя из-за большого количества повторяющегося текста, но может работать с более уникальными данными.

Сначала создайте базу данных, сканируя слова и т.д.:

import sqlite3
import re

INPUT_FILENAME = 'bigfile.txt'
DB_NAME = 'words.db'
FLUSH_X_WORDS=10000


conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()


cursor.execute("""
CREATE TABLE IF NOT EXISTS WORDS (
     POS INTEGER
    ,WORD TEXT
    ,PRIMARY KEY( POS, WORD )
) WITHOUT ROWID
""")

cursor.execute("""
DROP INDEX IF EXISTS I_WORDS_WORD_POS
""")

cursor.execute("""
DROP INDEX IF EXISTS I_WORDS_POS_WORD
""")


cursor.execute("""
DELETE FROM WORDS
""")

conn.commit()

def flush_words(words):
    for word in words.keys():
        for pos in words[word]:
            cursor.execute('INSERT INTO WORDS (POS, WORD) VALUES( ?, ? )', (pos, word.lower()) )

    conn.commit()

words = dict()
pos = 0
recomp = re.compile('\w+')
with open(INPUT_FILENAME, 'r') as f:
    for line in f:

        for word in [x.lower() for x in recomp.findall(line) if x]:
            pos += 1
            if words.has_key(word):
                words[word].append(pos)
            else:
                words[word] = [pos]
        if pos % FLUSH_X_WORDS == 0:
            flush_words(words)
            words = dict()
    if len(words) > 0:
        flush_words(words)
        words = dict()


cursor.execute("""
CREATE UNIQUE INDEX I_WORDS_WORD_POS ON WORDS ( WORD, POS )
""")

cursor.execute("""
CREATE UNIQUE INDEX I_WORDS_POS_WORD ON WORDS ( POS, WORD )
""")

cursor.execute("""
VACUUM
""")

cursor.execute("""
ANALYZE WORDS
""")

Затем выполните поиск базы данных, создав SQL:

import sqlite3
import re

SEARCH_PHRASE = 'how fast it is'
DB_NAME = 'words.db'


conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()

recomp = re.compile('\w+')

search_list = [x.lower() for x in recomp.findall(SEARCH_PHRASE) if x]

from_clause = 'FROM\n'
where_clause = 'WHERE\n'
num = 0
fsep = '     '
wsep = '     '
for word in search_list:
    num += 1
    from_clause += '{fsep}words w{num}\n'.format(fsep=fsep,num=num)
    where_clause += "{wsep} w{num}.word = '{word}'\n".format(wsep=wsep, num=num, word=word)
    if num > 1:
        where_clause += "  AND w{num}.pos = w{lastnum}.pos + 1\n".format(num=str(num),lastnum=str(num-1))

    fsep = '    ,'
    wsep = '  AND'


sql = """{select}{fromc}{where}""".format(select='SELECT COUNT(*)\n',fromc=from_clause, where=where_clause)

res = cursor.execute( sql )

print res.fetchone()[0] 

Ответ 7

Мы говорим о простом подсчете конкретной подстроки в довольно большом потоке данных. Задача почти наверняка связана с I/O, но очень легко распараллеливается. Первый уровень - это скорость чтения; мы можем уменьшить количество чтения с помощью сжатия или распределить скорость передачи, сохраняя данные в нескольких местах. Тогда у нас есть сам поиск; поиск подстрок - это хорошо известная проблема, опять же ограничение ввода-вывода. Если набор данных происходит от одного диска в значительной степени любой оптимизация не является спорной, так как там не так, что диск бьется одно ядро ​​в скорости.

Предполагая, что у нас есть куски, которые могут быть, например, отдельными блоками файла bzip2 (если мы используем поточный декомпрессор), полосами в RAID или распределенными узлами, мы можем многое получить от их обработки по отдельности. Выполняется поиск каждого фрагмента needle, тогда суставы могут быть сформированы путем принятия len(needle)-1 с конца одного фрагмента и начала следующего и поиска в них.

Быстрый тест показывает, что машины с регулярными выражениями работают быстрее, чем обычный оператор in:

>>> timeit.timeit("x.search(s)", "s='a'*500000; import re; x=re.compile('foobar')", number=20000)
17.146117210388184
>>> timeit.timeit("'foobar' in s", "s='a'*500000", number=20000)
24.263535976409912
>>> timeit.timeit("n in s", "s='a'*500000; n='foobar'", number=20000)
21.562405109405518

Еще один шаг оптимизации, который мы можем выполнить, учитывая, что мы имеем данные в файле, заключается в mmap вместо использования обычные операции чтения. Это позволяет операционной системе напрямую использовать дисковые буферы. Он также позволяет ядру удовлетворять несколько запросов на чтение в произвольном порядке без дополнительных системных вызовов, что позволяет использовать такие вещи, как базовый RAID, при работе в нескольких потоках.

Вот быстро протолкнул прототип. Очевидно, что некоторые вещи можно было бы улучшить, например, распределить куски, если у нас есть многоузловой кластер, выполняя проверку хвоста + головы, передавая один соседнему работнику (порядок, который неизвестен в этой реализации) вместо того, чтобы отправлять оба специального работника и реализации класса промежуточной очереди (pipe), а не для сопоставления семафоров. Вероятно, было бы также целесообразно перемещать рабочие потоки вне функции основного потока, поскольку основной поток продолжает изменять его локали.

from mmap import mmap, ALLOCATIONGRANULARITY, ACCESS_READ
from re import compile, escape
from threading import Semaphore, Thread
from collections import deque

def search(needle, filename):
    # Might want chunksize=RAID block size, threads
    chunksize=ALLOCATIONGRANULARITY*1024
    threads=32
    # Read chunk allowance
    allocchunks=Semaphore(threads)  # should maybe be larger
    chunkqueue=deque()   # Chunks mapped, read by workers
    chunksready=Semaphore(0)
    headtails=Semaphore(0)   # edges between chunks into special worker
    headtailq=deque()
    sumq=deque()     # worker final results

    # Note: although we do push and pop at differing ends of the
    # queues, we do not actually need to preserve ordering. 

    def headtailthread():
        # Since head+tail is 2*len(needle)-2 long, 
        # it cannot contain more than one needle
        htsum=0
        matcher=compile(escape(needle))
        heads={}
        tails={}
        while True:
            headtails.acquire()
            try:
                pos,head,tail=headtailq.popleft()
            except IndexError:
                break  # semaphore signaled without data, end of stream
            try:
                prevtail=tails.pop(pos-chunksize)
                if matcher.search(prevtail+head):
                    htsum+=1
            except KeyError:
                heads[pos]=head
            try:
                nexthead=heads.pop(pos+chunksize)
                if matcher.search(tail+nexthead):
                    htsum+=1
            except KeyError:
                tails[pos]=tail
        # No need to check spill tail and head as they are shorter than needle
        sumq.append(htsum)

    def chunkthread():
        threadsum=0
        # escape special characters to achieve fixed string search
        matcher=compile(escape(needle))
        borderlen=len(needle)-1
        while True:
            chunksready.acquire()
            try:
                pos,chunk=chunkqueue.popleft()
            except IndexError:   # End of stream
                break
            # Let the re module do the heavy lifting
            threadsum+=len(matcher.findall(chunk))
            if borderlen>0:
                # Extract the end pieces for checking borders
                head=chunk[:borderlen]
                tail=chunk[-borderlen:]
                headtailq.append((pos,head,tail))
                headtails.release()
            chunk.close()
            allocchunks.release()  # let main thread allocate another chunk
        sumq.append(threadsum)

    with infile=open(filename,'rb'):
        htt=Thread(target=headtailthread)
        htt.start()
        chunkthreads=[]
        for i in range(threads):
            t=Thread(target=chunkthread)
            t.start()
            chunkthreads.append(t)
        pos=0
        fileno=infile.fileno()
        while True:
            allocchunks.acquire()
            chunk=mmap(fileno, chunksize, access=ACCESS_READ, offset=pos)
            chunkqueue.append((pos,chunk))
            chunksready.release()
            pos+=chunksize
            if pos>chunk.size():   # Last chunk of file?
                break
        # File ended, finish all chunks
        for t in chunkthreads:
            chunksready.release()   # wake thread so it finishes
        for t in chunkthreads:
            t.join()    # wait for thread to finish
        headtails.release()     # post event to finish border checker
        htt.join()
        # All threads finished, collect our sum
        return sum(sumq)

if __name__=="__main__":
    from sys import argv
    print "Found string %d times"%search(*argv[1:])

Кроме того, изменение всего элемента для использования некоторой процедуры mapreduce (фрагменты карты, подсчеты, головы и хвосты, уменьшение путем суммирования счетчиков и проверка хвоста + частей головы) остается в качестве упражнения.

Изменить: поскольку кажется, что этот поиск будет повторяться с различными иглами, индекс будет намного быстрее, и он сможет пропустить поиск разделов, которые, как известно, не совпадают. Одна из возможностей заключается в создании карты, из которых блоки содержат любое появление различных n-граммов (с учетом границ блоков, позволяя ngram перекрываться в следующем); эти карты затем могут быть объединены, чтобы найти более сложные условия, прежде чем нужно будет загружать блоки исходных данных. Для этого есть базы данных; искать полнотекстовые поисковые системы.

Ответ 8

Я признаю, что grep будет быстрее. Я предполагаю, что этот файл представляет собой большой файл на основе строк.

Но вы могли бы сделать что-то подобное, если бы действительно хотели.

import os
import re
import mmap

fileName = 'bigfile.txt'
phrase = re.compile("how fast it is")

with open(fileName, 'r') as fHandle:
    data = mmap.mmap(fHandle.fileno(), os.path.getsize(fileName), access=mmap.ACCESS_READ)
    matches = re.match(phrase, data)
    print('matches = {0}'.format(matches.group()))