Разбор огромных лог файлов в Node.js - чтение в строке
Мне нужно выполнить парсинг больших (5-10 Гб) лог файлов в Javascript/ Node.js(я использую Cube).
Логлайн выглядит примерно так:
10:00:43.343423 I'm a friendly log message. There are 5 cats, and 7 dogs. We are in state "SUCCESS".
Нам нужно прочитать каждую строку, выполнить разбор (например, вырезать 5
, 7
и SUCCESS
), затем перекачать эти данные в Cube (https://github.com/square/cube), используя их JS-клиент.
Во-первых, каков канонический путь в Node для чтения в файле, строка за строкой?
Кажется, это довольно распространенный вопрос онлайн:
Многие ответы, похоже, указывают на группу сторонних модулей:
Однако это кажется довольно простой задачей - конечно, есть простой способ внутри stdlib читать в текстовом файле по очереди?
Во-вторых, мне нужно обработать каждую строку (например, преобразовать метку времени в объект Date и извлечь полезные поля).
Какой лучший способ сделать это, максимизируя пропускную способность? Есть ли способ, который не будет блокироваться ни чтением в каждой строке, ни при отправке его в Cube?
В-третьих - я предполагаю использование строковых разделов, а эквивалент JS содержит (IndexOf!= -1?) будет намного быстрее, чем регулярные выражения? Кто-нибудь имел большой опыт в анализе массивных текстовых данных в node.js?
Cheers,
Виктор
Ответы
Ответ 1
Я искал решение для анализа очень больших файлов (gbs) построчно, используя поток. Все сторонние библиотеки и примеры не соответствовали моим потребностям, поскольку они обрабатывали файлы не построчно (например, 1, 2, 3, 4..) или считывали весь файл в память
Следующее решение может анализировать очень большие файлы построчно, используя stream & pipe. Для тестирования я использовал файл 2,1 ГБ с 17 000 000 записей. Использование оперативной памяти не превышало 60 мб.
var fs = require('fs')
, es = require('event-stream');
var lineNr = 0;
var s = fs.createReadStream('very-large-file.csv')
.pipe(es.split())
.pipe(es.mapSync(function(line){
// pause the readstream
s.pause();
lineNr += 1;
// process line here and call s.resume() when rdy
// function below was for logging memory usage
logMemoryUsage(lineNr);
// resume the readstream, possibly from a callback
s.resume();
})
.on('error', function(err){
console.log('Error while reading file.', err);
})
.on('end', function(){
console.log('Read entire file.')
})
);
![enter image description here]()
Пожалуйста, дайте мне знать, как это происходит!
Ответ 2
Вы можете использовать встроенный пакет readline
, см. docs здесь. Я использую stream для создания нового выходного потока.
var fs = require('fs'),
readline = require('readline'),
stream = require('stream');
var instream = fs.createReadStream('/path/to/file');
var outstream = new stream;
outstream.readable = true;
outstream.writable = true;
var rl = readline.createInterface({
input: instream,
output: outstream,
terminal: false
});
rl.on('line', function(line) {
console.log(line);
//Do your stuff ...
//Then write to outstream
rl.write(cubestuff);
});
Большим файлам потребуется некоторое время для обработки. Скажите, работает ли он.
Ответ 3
Мне действительно понравился ответ @gerard, который на самом деле заслуживает того, чтобы быть правильным ответом здесь. Я сделал некоторые улучшения:
- Код находится в классе (модульном)
- Разбор включен
- Возможность возобновления предоставляется внешнему в случае, если асинхронное задание привязано к чтению CSV, например, вставка в БД или запрос HTTP
- Чтение в размерах кусков/батчей, которые
пользователь может объявить. Я также позаботился о кодировании в потоке, в случае
у вас есть файлы в разных кодировках.
Здесь код:
'use strict'
const fs = require('fs'),
util = require('util'),
stream = require('stream'),
es = require('event-stream'),
parse = require("csv-parse"),
iconv = require('iconv-lite');
class CSVReader {
constructor(filename, batchSize, columns) {
this.reader = fs.createReadStream(filename).pipe(iconv.decodeStream('utf8'))
this.batchSize = batchSize || 1000
this.lineNumber = 0
this.data = []
this.parseOptions = {delimiter: '\t', columns: true, escape: '/', relax: true}
}
read(callback) {
this.reader
.pipe(es.split())
.pipe(es.mapSync(line => {
++this.lineNumber
parse(line, this.parseOptions, (err, d) => {
this.data.push(d[0])
})
if (this.lineNumber % this.batchSize === 0) {
callback(this.data)
}
})
.on('error', function(){
console.log('Error while reading file.')
})
.on('end', function(){
console.log('Read entirefile.')
}))
}
continue () {
this.data = []
this.reader.resume()
}
}
module.exports = CSVReader
Итак, вот как вы его будете использовать:
let reader = CSVReader('path_to_file.csv')
reader.read(() => reader.continue())
Я тестировал это с помощью файла CSV на 35 ГБ, и он работал у меня, и поэтому я решил построить его на @gerard, приветствуются отзывы.
Ответ 4
Я использовал https://www.npmjs.com/package/line-by-line для чтения более 1 000 000 строк из текстового файла. В этом случае занятая емкость ОЗУ составляла около 50-60 мегабайт.
const LineByLineReader = require('line-by-line'),
lr = new LineByLineReader('big_file.txt');
lr.on('error', function (err) {
// 'err' contains error object
});
lr.on('line', function (line) {
// pause emitting of lines...
lr.pause();
// ...do your asynchronous line processing..
setTimeout(function () {
// ...and continue emitting lines.
lr.resume();
}, 100);
});
lr.on('end', function () {
// All lines are read, file is closed now.
});
Ответ 5
Помимо чтения большого файла построчно, вы также можете читать его по частям. Для получения дополнительной информации обратитесь к этой статье
var offset = 0;
var chunkSize = 2048;
var chunkBuffer = new Buffer(chunkSize);
var fp = fs.openSync('filepath', 'r');
var bytesRead = 0;
while(bytesRead = fs.readSync(fp, chunkBuffer, 0, chunkSize, offset)) {
offset += bytesRead;
var str = chunkBuffer.slice(0, bytesRead).toString();
var arr = str.split('\n');
if(bytesRead = chunkSize) {
// the last item of the arr may be not a full line, leave it to the next chunk
offset -= arr.pop().length;
}
lines.push(arr);
}
console.log(lines);
Ответ 6
У меня была такая же проблема. После сравнения нескольких модулей, которые, похоже, имеют эту функцию, я решил сделать это сам, это проще, чем я думал.
gist: https://gist.github.com/deemstone/8279565
var fetchBlock = lineByline(filepath, onEnd);
fetchBlock(function(lines, start){ ... }); //lines{array} start{int} lines[0] No.
Он покрывает файл, открытый в закрытии, который возвращает fetchBlock()
, извлекает блок из файла, заканчивает разделение на массив (будет обрабатывать сегмент из последней выборки).
Я установил размер блока 1024 для каждой операции чтения. У этого могут быть ошибки, но логика кода очевидна, попробуйте сами.
Ответ 7
node -byline использует потоки, поэтому я бы предпочел, чтобы один для ваших огромных файлов.
для ваших конверсий даты я бы использовал moment.js.
чтобы максимально увеличить пропускную способность, вы могли бы подумать об использовании программного кластера. есть несколько хороших модулей, которые довольно хорошо завершают node -нативный кластер-модуль. Мне нравится cluster-master от isaacs. например вы можете создать кластер из x рабочих, которые все вычислили файл.
для сравнительных разрывов по сравнению с регулярными выражениями используйте benchmark.js. Я не тестировал его до сих пор. benchmark.js доступен как node -модуль
Ответ 8
На основе этого ответа на вопрос я реализовал класс, который вы можете использовать для чтения файла синхронно по очереди с fs.readSync()
. Вы можете сделать эту "паузу" и "возобновить", используя обещание Q
(jQuery
, похоже, требует DOM, поэтому не могу запустить его с помощью nodejs
):
var fs = require('fs');
var Q = require('q');
var lr = new LineReader(filenameToLoad);
lr.open();
var promise;
workOnLine = function () {
var line = lr.readNextLine();
promise = complexLineTransformation(line).then(
function() {console.log('ok');workOnLine();},
function() {console.log('error');}
);
}
workOnLine();
complexLineTransformation = function (line) {
var deferred = Q.defer();
// ... async call goes here, in callback: deferred.resolve('done ok'); or deferred.reject(new Error(error));
return deferred.promise;
}
function LineReader (filename) {
this.moreLinesAvailable = true;
this.fd = undefined;
this.bufferSize = 1024*1024;
this.buffer = new Buffer(this.bufferSize);
this.leftOver = '';
this.read = undefined;
this.idxStart = undefined;
this.idx = undefined;
this.lineNumber = 0;
this._bundleOfLines = [];
this.open = function() {
this.fd = fs.openSync(filename, 'r');
};
this.readNextLine = function () {
if (this._bundleOfLines.length === 0) {
this._readNextBundleOfLines();
}
this.lineNumber++;
var lineToReturn = this._bundleOfLines[0];
this._bundleOfLines.splice(0, 1); // remove first element (pos, howmany)
return lineToReturn;
};
this.getLineNumber = function() {
return this.lineNumber;
};
this._readNextBundleOfLines = function() {
var line = "";
while ((this.read = fs.readSync(this.fd, this.buffer, 0, this.bufferSize, null)) !== 0) { // read next bytes until end of file
this.leftOver += this.buffer.toString('utf8', 0, this.read); // append to leftOver
this.idxStart = 0
while ((this.idx = this.leftOver.indexOf("\n", this.idxStart)) !== -1) { // as long as there is a newline-char in leftOver
line = this.leftOver.substring(this.idxStart, this.idx);
this._bundleOfLines.push(line);
this.idxStart = this.idx + 1;
}
this.leftOver = this.leftOver.substring(this.idxStart);
if (line !== "") {
break;
}
}
};
}
Ответ 9
Документация Node.js предлагает очень элегантный пример использования модуля Readline.
Пример: построчное чтение файлового потока
const fs = require('fs');
const readline = require('readline');
const rl = readline.createInterface({
input: fs.createReadStream('sample.txt'),
crlfDelay: Infinity
});
rl.on('line', (line) => {
console.log('Line from file: ${line}');
});
Примечание: мы используем опцию crlfDelay, чтобы распознать все экземпляры CR LF ('\ r\n') как разрыв строки.
Ответ 10
Я сделал модуль node для чтения асинхронного текста больших файлов или JSON.
Протестировано на больших файлах.
var fs = require('fs')
, util = require('util')
, stream = require('stream')
, es = require('event-stream');
module.exports = FileReader;
function FileReader(){
}
FileReader.prototype.read = function(pathToFile, callback){
var returnTxt = '';
var s = fs.createReadStream(pathToFile)
.pipe(es.split())
.pipe(es.mapSync(function(line){
// pause the readstream
s.pause();
//console.log('reading line: '+line);
returnTxt += line;
// resume the readstream, possibly from a callback
s.resume();
})
.on('error', function(){
console.log('Error while reading file.');
})
.on('end', function(){
console.log('Read entire file.');
callback(returnTxt);
})
);
};
FileReader.prototype.readJSON = function(pathToFile, callback){
try{
this.read(pathToFile, function(txt){callback(JSON.parse(txt));});
}
catch(err){
throw new Error('json file is not valid! '+err.stack);
}
};
Просто сохраните файл как файл-reader.js и используйте его следующим образом:
var FileReader = require('./file-reader');
var fileReader = new FileReader();
fileReader.readJSON(__dirname + '/largeFile.json', function(jsonObj){/*callback logic here*/});
Ответ 11
import * as csv from 'fast-csv';
import * as fs from 'fs';
interface Row {
[s: string]: string;
}
type RowCallBack = (data: Row, index: number) => object;
export class CSVReader {
protected file: string;
protected csvOptions = {
delimiter: ',',
headers: true,
ignoreEmpty: true,
trim: true
};
constructor(file: string, csvOptions = {}) {
if (!fs.existsSync(file)) {
throw new Error('File ${file} not found.');
}
this.file = file;
this.csvOptions = Object.assign({}, this.csvOptions, csvOptions);
}
public read(callback: RowCallBack): Promise < Array < object >> {
return new Promise < Array < object >> (resolve => {
const readStream = fs.createReadStream(this.file);
const results: Array < any > = [];
let index = 0;
const csvStream = csv.parse(this.csvOptions).on('data', async (data: Row) => {
index++;
results.push(await callback(data, index));
}).on('error', (err: Error) => {
console.error(err.message);
throw err;
}).on('end', () => {
resolve(results);
});
readStream.pipe(csvStream);
});
}
}
import { CSVReader } from '../src/helpers/CSVReader';
(async () => {
const reader = new CSVReader('./database/migrations/csv/users.csv');
const users = await reader.read(async data => {
return {
username: data.username,
name: data.name,
email: data.email,
cellPhone: data.cell_phone,
homePhone: data.home_phone,
roleId: data.role_id,
description: data.description,
state: data.state,
};
});
console.log(users);
})();