Параллельная обработка большого .csv файла в Python

Я обрабатываю большие CSV файлы (порядка нескольких ГБ с 10 М строк) с помощью Python script.

Файлы имеют разные длины строк и не могут быть полностью загружены в память для анализа.

Каждая строка обрабатывается отдельно функцией в моем script. Для анализа одного файла требуется около 20 минут, и, похоже, скорость доступа к диску не является проблемой, а скорее вызовом обработки/функции.

Код выглядит примерно так (очень просто). Фактический код использует структуру класса, но это похоже:

csvReader = csv.reader(open("file","r")
for row in csvReader:
   handleRow(row, dataStructure)

Учитывая, что для вычисления требуется общая структура данных, какой был бы лучший способ параллельного анализа анализа на Python с использованием нескольких ядер?

В общем, как я могу прочитать сразу несколько строк из .csv в Python для передачи потоку/процессу? Цикл с for по строкам не очень эффективен.

Спасибо!

Ответы

Ответ 1

Попробуйте сравнительный анализ вашего файла и разбор каждой строки CSV, но ничего не делая с ним. Вы исключили доступ к диску, но вам все равно нужно выяснить, является ли синтаксический анализ CSV медленным, или если ваш собственный код является медленным.

Если это синтаксический анализ CSV медленный, вы можете застрять, потому что я не думаю, что есть способ перепрыгнуть в середину CSV файла без сканирования до этой точки.

Если это ваш собственный код, вы можете иметь один поток, читающий CSV файл, и вывод строк в очередь, а затем несколько потоков, обрабатывающих строки из этой очереди. Но не беспокойтесь об этом решении, если сам анализ CSV делает его медленным.

Ответ 2

Из-за GIL, потоки Python не будут ускорять вычисления, которые связаны с процессором, как это может быть с привязкой IO.

Вместо этого взгляните на модуль многопроцессорности, который может запускать ваш код на нескольких процессорах параллельно.

Ответ 3

Это может быть слишком поздно, но для будущих пользователей я все равно отправлю. Другой плакат упоминается с использованием многопроцессорности. Я могу ручаться за это и могу зайти подробнее. Python мы ежедневно обрабатываем файлы в сотнях MB/несколько GB каждый день. Так что это определенно зависит от задачи. Некоторые из файлов, с которыми мы имеем дело, - это не CSV, поэтому синтаксический анализ может быть довольно сложным и занимает больше времени, чем доступ к диску. Тем не менее, методология одинакова независимо от типа файла.

Вы можете обрабатывать фрагменты больших файлов одновременно. Здесь псевдо-код того, как мы это делаем:

import os, multiprocessing as mp

# process file function
def processfile(filename, start=0, stop=0):
    if start == 0 and stop == 0:
        ... process entire file...
    else:
        with open(file, 'r') as fh:
            fh.seek(start)
            lines = fh.readlines(stop - start)
            ... process these lines ...

    return results

if __name__ == "__main__":

    # get file size and set chuck size
    filesize = os.path.getsize(filename)
    split_size = 100*1024*1024

    # determine if it needs to be split
    if filesize > split_size:

        # create pool, initialize chunk start location (cursor)
        pool = mp.Pool(cpu_count)
        cursor = 0
        results = []
        with open(file, 'r') as fh:

            # for every chunk in the file...
            for chunk in xrange(filesize // split_size):

                # determine where the chunk ends, is it the last one?
                if cursor + split_size > filesize:
                    end = filesize
                else:
                    end = cursor + split_size

                # seek to end of chunk and read next line to ensure you 
                # pass entire lines to the processfile function
                fh.seek(end)
                fh.readline()

                # get current file location
                end = fh.tell()

                # add chunk to process pool, save reference to get results
                proc = pool.apply_async(processfile, args=[filename, cursor, end])
                results.append(proc)

                # setup next chunk
                cursor = end

        # close and wait for pool to finish
        pool.close()
        pool.join()

        # iterate through results
        for proc in results:
            processfile_result = proc.get()

    else:
        ...process normally...

Как я уже сказал, это только псевдокод. Это должно заставить каждого начать, кто должен что-то сделать. У меня нет кода передо мной, просто делаю это из памяти.

Но мы получили больше, чем 2-кратное ускорение от этого при первом запуске без тонкой настройки. Вы можете точно настроить количество процессов в пуле и увеличить объем кусков, чтобы получить еще большую скорость в зависимости от вашей настройки. Если у вас есть несколько файлов, как и мы, создайте пул для параллельного чтения нескольких файлов. Просто будьте осторожны, чтобы перегрузить коробку слишком многими процессами.

Примечание. Вы должны поместить его в блок "если главный", чтобы гарантировать, что бесконечные процессы не будут созданы.

Ответ 4

Если строки полностью независимы, просто разделите входной файл на столько файлов, сколько у вас есть. После этого вы можете запускать столько экземпляров процесса, сколько входных файлов, которые у вас есть. Эти экземпляры, поскольку они являются совершенно разными процессами, не будут связаны проблемами GIL.

Ответ 5

Если вы используете zmq и средний человек DEALER, вы сможете распространять обработку строк не только на процессоры вашего компьютера, но и через сеть до любого количества процессов по мере необходимости. Это по существу гарантирует, что вы нажмете ограничение ввода-вывода против предела ЦП:)