Разбор очень больших файлов CSV с С++

Моя цель - проанализировать большие файлы csv с помощью С++ в проекте QT в среде OSX. (Когда я говорю csv, я имею в виду tsv и другие варианты 1GB ~ 5GB).

Это похоже на простую задачу, но все усложняется, когда размер файлов становится больше. Я не хочу писать свой собственный синтаксический анализатор из-за множества краевых случаев, связанных с разбором файлов csv.

Я нашел различные библиотеки обработки csv для обработки этого задания, но разбор 1GB файла занимает около 90-120 секунд на моей машине, что неприемлемо. я ничего не делаю с данными прямо сейчас, я просто обрабатываю и отбрасываю данные для целей тестирования.

cccsvparser - одна из библиотек, которые я пробовал. Но единственной достаточно быстрой библиотекой была fast-cpp-csv-parser, которая дает приемлемые результаты: 15 секунд на моей машине, но она работает только тогда, когда структура файла Известно.

Пример использования: fast-cpp-csv-parser

#include "csv.h"

int main(){
    io::CSVReader<3> in("ram.csv");
    in.read_header(io::ignore_extra_column, "vendor", "size", "speed");
    std::string vendor; int size; double speed;
    while(in.read_row(vendor, size, speed)){
    // do stuff with the data
    }
}

Как вы можете видеть, я не могу загрузить произвольные файлы, и я должен определенно определять переменные в соответствии с моей файловой структурой. Я не знаю ни одного метода, который позволяет мне динамически создавать эти переменные во время выполнения.

Другой подход, который я пробовал, - это прочитать файл csv по строке fast-cpp-csv-parser класс LineReader, который очень быстро (около 7 secs, чтобы прочитать весь файл), а затем проанализировать каждую строку с помощью cccsvparser lib, которая может обрабатывать строки. но это занимает около 40 секунд до завершения, это улучшение по сравнению с первыми попытками, но все же неприемлемо.

Я видел различные вопросы stackoverflow, связанные с разбором файлов csv, ни один из них не требует большой обработки файлов для учетной записи.

Также я потратил много времени на поиски решения этой проблемы, и я действительно скучаю по свободе, которую менеджеры пакетов предпочитают npm или pip при поиске из готовых решений.

Я буду благодарен за любое предложение о том, как справиться с этой проблемой.

Edit:

При использовании подхода @fbucek время обработки сокращается до 25 секунд, что является большим улучшением.

можем ли мы оптимизировать это еще больше?

Ответы

Ответ 1

Я предполагаю, что вы используете только один поток.

Многопоточность может ускорить процесс.

Самое лучшее достижение пока 40 секунд. Давайте придерживаться этого.

Я предположил, что сначала вы читаете, затем обрабатываете → (около 7 секунд для чтения всего файла)

7 секунд для чтения 33 с для обработки

Прежде всего , вы можете разделить файл на куски, допустим, 50 МБ. Это означает, что вы можете начать обработку после чтения 50 МБ файла. Вам не нужно ждать завершения всего файла. Это 0,35 с для чтения (теперь это 0,35 + 33 секунды для обработки = примерно 34 сек)

Когда вы используете Multithreading, , вы можете обрабатывать несколько фрагментов за раз. Это может ускорить процесс теоретически до количества ваших ядер. Скажем, у вас 4 ядра. Это 33/4 = 8,25 с.

Я думаю, вы можете ускорить обработку с 4 ядрами до 9 с..

Посмотрите QThreadPool и QRunnable или QtConcurrent Я бы предпочел QThreadPool

Разделите задачу на части:

  • Сначала попробуйте перебрать файл и разделить его на куски. И ничего не делайте с этим.
  • Затем создайте класс "ChunkProcessor", который может обрабатывать этот фрагмент
  • Сделайте "ChunkProcessor" подклассом QRunnable и в переопределенной функции run() выполните ваш процесс
  • Если у вас есть куски, у вас есть класс, который может обрабатывать их, а этот класс совместим с QThreadPool, вы можете передать его в

Это может выглядеть так:

loopoverfile {
  whenever chunk is ready {
     ChunkProcessor *chunkprocessor = new ChunkProcessor(chunk);
     QThreadPool::globalInstance()->start(chunkprocessor);
     connect(chunkprocessor, SIGNAL(finished(std::shared_ptr<ProcessedData>)), this, SLOT(readingFinished(std::shared_ptr<ProcessedData>)));
  }   
}

Вы можете использовать std:: share_ptr для передачи обработанных данных, чтобы не использовать QMutex или что-то еще и избежать проблем с сериализацией с несколькими потоками доступа к некоторому ресурсу.

Примечание: для использования пользовательского сигнала вам необходимо зарегистрировать его перед использованием

qRegisterMetaType<std::shared_ptr<ProcessedData>>("std::shared_ptr<ProcessedData>");

Изменить: (на основе обсуждения мой ответ был неясно об этом) Неважно, какой диск вы используете или насколько это быстро. Чтение - операция с одним потоком. Это решение было предложено только потому, что потребовалось 7 секунд для чтения и снова не имеет значения, какой именно диск. 7 секунд. И только цель - начать обработку как можно скорее и не дождаться окончания чтения.

Вы можете использовать:

QByteArray data = file.readAll();

Или вы можете использовать основную идею: (я не знаю, почему это занимает 7 секунд, чтобы прочитать, что позади)

 QFile file("in.txt");
 if (!file.open(QIODevice::ReadOnly | QIODevice::Text))
   return;

 QByteArray* data = new QByteArray;    
 int count = 0;
 while (!file.atEnd()) {
   ++count;
   data->append(file.readLine());
   if ( count > 10000 ) {
     ChunkProcessor *chunkprocessor = new ChunkProcessor(data);
     QThreadPool::globalInstance()->start(chunkprocessor);
     connect(chunkprocessor, SIGNAL(finished(std::shared_ptr<ProcessedData>)), this, SLOT(readingFinished(std::shared_ptr<ProcessedData>)));
     data = new QByteArray; 
     count = 0;
   }
 }

Один файл, один поток, читается почти так же быстро, как чтение по строке "без" прерывания. То, что вы делаете с данными, является другой проблемой, но не имеет ничего общего с I/O. Это уже в памяти. Поэтому только забота будет о файле 5 ГБ и окунах ОЗУ на машине.

Это очень простое решение, в котором вы нуждаетесь, - это подкласс QRunnable, функция повторного выполнения, выдавать сигнал, когда он закончен, передавать обработанные данные с помощью общего указателя и в объединении основного потока, что данные в одну структуру или что-то еще. Простое поточное решение.

Ответ 2

Я бы предложил многопоточное предложение с небольшим изменением: один поток посвящен чтению файла в предопределенном (настраиваемом) размере кусков и продолжает подавать данные в набор потоков (более чем один базовый процессорный процессор), Скажем, что конфигурация выглядит так:

размер куска = 50 МБ
Disk Thread = 1
Технологические потоки = 5

  • Создайте класс для чтения данных из файла. В этом классе он содержит структуру данных, которая используется для связи с потоками процессов. Например, эта структура будет содержать начальное смещение, окончательное смещение буфера чтения для каждого потока процесса. Для чтения данных файла класс читателя содержит 2 буфера для каждого размера блока (в этом случае 50 МБ).
  • Создайте класс процессов, который содержит указатели (общие) для буферов чтения и смещения структуры данных.
  • Теперь создайте драйвер (возможно, основной поток), создайте все потоки и ожидайте их завершения и обработайте сигналы.
  • Чтение потока вызывается с классом читателя, считывает 50 МБ данных и на основе количества потоков создает смещение объекта структуры данных. В этом случае t1 обрабатывает 0 - 10 МБ, t2 обрабатывает 10 - 20 МБ и так далее. Когда он готов, он уведомляет потоки процессора. Затем он сразу же считывает следующий фрагмент с диска и ждет уведомления о потоке процессора от потока процессов.
  • Потоки процессора в уведомлении, считывают данные из буфера и обрабатывают его. После этого он уведомляет поток читателя о завершении и ждет следующего фрагмента.
  • Этот процесс завершается до тех пор, пока все данные не будут прочитаны и обработаны. Затем поток читателя возвращается к основному потоку о завершении, который отправляет PROCESS_COMPLETION, после выхода всех потоков. или основной поток выбирает обработку следующего файла в очереди.

Обратите внимание, что смещения взяты для упрощения объяснения, программные преобразования для сопоставления разделителей строк должны обрабатываться программно.

Ответ 3

Если используемый вами анализатор не распространяется, очевидно, что подход не масштабируется.

Я бы проголосовал за такую ​​технику ниже

  • помещает файл в размер, который может обрабатываться с помощью ограничения времени машины/времени
  • распределите куски кластера машин (1.. *), которые могут соответствовать вашим требованиям времени/пространства
  • рассмотрим вопрос о размерах блоков для данного фрагмента
  • Избегайте потоков на одном ресурсе (например, данный блок), чтобы избавиться от всех проблем, связанных с потоком.
  • Используйте потоки для достижения неконкурентных (на ресурсе) операций - например, чтение в одном потоке и запись в другом потоке в другой файл.
  • выполните синтаксический анализ (теперь для этого небольшого фрагмента вы можете вызвать ваш парсер).
  • выполните свои операции.
  • объединить результаты назад/если они могут распространять их, как они есть.

Теперь, сказав это, почему вы не можете использовать фреймворки Hadoop?