Разбор очень больших файлов CSV с С++
Моя цель - проанализировать большие файлы csv с помощью С++ в проекте QT в среде OSX.
(Когда я говорю csv, я имею в виду tsv и другие варианты 1GB ~ 5GB).
Это похоже на простую задачу, но все усложняется, когда размер файлов становится больше. Я не хочу писать свой собственный синтаксический анализатор из-за множества краевых случаев, связанных с разбором файлов csv.
Я нашел различные библиотеки обработки csv для обработки этого задания, но разбор 1GB файла занимает около 90-120 секунд на моей машине, что неприемлемо. я ничего не делаю с данными прямо сейчас, я просто обрабатываю и отбрасываю данные для целей тестирования.
cccsvparser - одна из библиотек, которые я пробовал. Но единственной достаточно быстрой библиотекой была fast-cpp-csv-parser, которая дает приемлемые результаты: 15 секунд на моей машине, но она работает только тогда, когда структура файла Известно.
Пример использования: fast-cpp-csv-parser
#include "csv.h"
int main(){
io::CSVReader<3> in("ram.csv");
in.read_header(io::ignore_extra_column, "vendor", "size", "speed");
std::string vendor; int size; double speed;
while(in.read_row(vendor, size, speed)){
// do stuff with the data
}
}
Как вы можете видеть, я не могу загрузить произвольные файлы, и я должен определенно определять переменные в соответствии с моей файловой структурой. Я не знаю ни одного метода, который позволяет мне динамически создавать эти переменные во время выполнения.
Другой подход, который я пробовал, - это прочитать файл csv по строке fast-cpp-csv-parser класс LineReader, который очень быстро (около 7 secs, чтобы прочитать весь файл), а затем проанализировать каждую строку с помощью cccsvparser lib, которая может обрабатывать строки. но это занимает около 40 секунд до завершения, это улучшение по сравнению с первыми попытками, но все же неприемлемо.
Я видел различные вопросы stackoverflow, связанные с разбором файлов csv, ни один из них не требует большой обработки файлов для учетной записи.
Также я потратил много времени на поиски решения этой проблемы, и я действительно скучаю по свободе, которую менеджеры пакетов предпочитают npm или pip при поиске из готовых решений.
Я буду благодарен за любое предложение о том, как справиться с этой проблемой.
Edit:
При использовании подхода @fbucek время обработки сокращается до 25 секунд, что является большим улучшением.
можем ли мы оптимизировать это еще больше?
Ответы
Ответ 1
Я предполагаю, что вы используете только один поток.
Многопоточность может ускорить процесс.
Самое лучшее достижение пока 40 секунд. Давайте придерживаться этого.
Я предположил, что сначала вы читаете, затем обрабатываете → (около 7 секунд для чтения всего файла)
7 секунд для чтения
33 с для обработки
Прежде всего , вы можете разделить файл на куски, допустим, 50 МБ.
Это означает, что вы можете начать обработку после чтения 50 МБ файла. Вам не нужно ждать завершения всего файла.
Это 0,35 с для чтения (теперь это 0,35 + 33 секунды для обработки = примерно 34 сек)
Когда вы используете Multithreading, , вы можете обрабатывать несколько фрагментов за раз. Это может ускорить процесс теоретически до количества ваших ядер. Скажем, у вас 4 ядра.
Это 33/4 = 8,25 с.
Я думаю, вы можете ускорить обработку с 4 ядрами до 9 с..
Посмотрите QThreadPool и QRunnable или QtConcurrent
Я бы предпочел QThreadPool
Разделите задачу на части:
- Сначала попробуйте перебрать файл и разделить его на куски. И ничего не делайте с этим.
- Затем создайте класс "ChunkProcessor", который может обрабатывать этот фрагмент
- Сделайте "ChunkProcessor" подклассом QRunnable и в переопределенной функции run() выполните ваш процесс
- Если у вас есть куски, у вас есть класс, который может обрабатывать их, а этот класс совместим с QThreadPool, вы можете передать его в
Это может выглядеть так:
loopoverfile {
whenever chunk is ready {
ChunkProcessor *chunkprocessor = new ChunkProcessor(chunk);
QThreadPool::globalInstance()->start(chunkprocessor);
connect(chunkprocessor, SIGNAL(finished(std::shared_ptr<ProcessedData>)), this, SLOT(readingFinished(std::shared_ptr<ProcessedData>)));
}
}
Вы можете использовать std:: share_ptr для передачи обработанных данных, чтобы не использовать QMutex или что-то еще и избежать проблем с сериализацией с несколькими потоками доступа к некоторому ресурсу.
Примечание: для использования пользовательского сигнала вам необходимо зарегистрировать его перед использованием
qRegisterMetaType<std::shared_ptr<ProcessedData>>("std::shared_ptr<ProcessedData>");
Изменить: (на основе обсуждения мой ответ был неясно об этом)
Неважно, какой диск вы используете или насколько это быстро. Чтение - операция с одним потоком.
Это решение было предложено только потому, что потребовалось 7 секунд для чтения и снова не имеет значения, какой именно диск. 7 секунд. И только цель - начать обработку как можно скорее и не дождаться окончания чтения.
Вы можете использовать:
QByteArray data = file.readAll();
Или вы можете использовать основную идею: (я не знаю, почему это занимает 7 секунд, чтобы прочитать, что позади)
QFile file("in.txt");
if (!file.open(QIODevice::ReadOnly | QIODevice::Text))
return;
QByteArray* data = new QByteArray;
int count = 0;
while (!file.atEnd()) {
++count;
data->append(file.readLine());
if ( count > 10000 ) {
ChunkProcessor *chunkprocessor = new ChunkProcessor(data);
QThreadPool::globalInstance()->start(chunkprocessor);
connect(chunkprocessor, SIGNAL(finished(std::shared_ptr<ProcessedData>)), this, SLOT(readingFinished(std::shared_ptr<ProcessedData>)));
data = new QByteArray;
count = 0;
}
}
Один файл, один поток, читается почти так же быстро, как чтение по строке "без" прерывания.
То, что вы делаете с данными, является другой проблемой, но не имеет ничего общего с I/O. Это уже в памяти.
Поэтому только забота будет о файле 5 ГБ и окунах ОЗУ на машине.
Это очень простое решение, в котором вы нуждаетесь, - это подкласс QRunnable, функция повторного выполнения, выдавать сигнал, когда он закончен, передавать обработанные данные с помощью общего указателя и в объединении основного потока, что данные в одну структуру или что-то еще. Простое поточное решение.
Ответ 2
Я бы предложил многопоточное предложение с небольшим изменением: один поток посвящен чтению файла в предопределенном (настраиваемом) размере кусков и продолжает подавать данные в набор потоков (более чем один базовый процессорный процессор), Скажем, что конфигурация выглядит так:
размер куска = 50 МБ
Disk Thread = 1
Технологические потоки = 5
- Создайте класс для чтения данных из файла. В этом классе он содержит структуру данных, которая используется для связи с потоками процессов. Например, эта структура будет содержать начальное смещение, окончательное смещение буфера чтения для каждого потока процесса. Для чтения данных файла класс читателя содержит 2 буфера для каждого размера блока (в этом случае 50 МБ).
- Создайте класс процессов, который содержит указатели (общие) для буферов чтения и смещения структуры данных.
- Теперь создайте драйвер (возможно, основной поток), создайте все потоки и ожидайте их завершения и обработайте сигналы.
- Чтение потока вызывается с классом читателя, считывает 50 МБ данных и на основе количества потоков создает смещение объекта структуры данных. В этом случае t1 обрабатывает 0 - 10 МБ, t2 обрабатывает 10 - 20 МБ и так далее. Когда он готов, он уведомляет потоки процессора. Затем он сразу же считывает следующий фрагмент с диска и ждет уведомления о потоке процессора от потока процессов.
- Потоки процессора в уведомлении, считывают данные из буфера и обрабатывают его. После этого он уведомляет поток читателя о завершении и ждет следующего фрагмента.
- Этот процесс завершается до тех пор, пока все данные не будут прочитаны и обработаны. Затем поток читателя возвращается к основному потоку о завершении, который отправляет PROCESS_COMPLETION, после выхода всех потоков. или основной поток выбирает обработку следующего файла в очереди.
Обратите внимание, что смещения взяты для упрощения объяснения, программные преобразования для сопоставления разделителей строк должны обрабатываться программно.
Ответ 3
Если используемый вами анализатор не распространяется, очевидно, что подход не масштабируется.
Я бы проголосовал за такую технику ниже
- помещает файл в размер, который может обрабатываться с помощью ограничения времени машины/времени
- распределите куски кластера машин (1.. *), которые могут соответствовать вашим требованиям времени/пространства
- рассмотрим вопрос о размерах блоков для данного фрагмента
- Избегайте потоков на одном ресурсе (например, данный блок), чтобы избавиться от всех проблем, связанных с потоком.
- Используйте потоки для достижения неконкурентных (на ресурсе) операций - например, чтение в одном потоке и запись в другом потоке в другой файл.
- выполните синтаксический анализ (теперь для этого небольшого фрагмента вы можете вызвать ваш парсер).
- выполните свои операции.
- объединить результаты назад/если они могут распространять их, как они есть.
Теперь, сказав это, почему вы не можете использовать фреймворки Hadoop?