Написание больших файлов с помощью Node.js
Я пишу большой файл с node.js с помощью записываемого потока:
var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', { flags : 'w' });
var lines;
while (lines = getLines()) {
for (var i = 0; i < lines.length; i++) {
stream.write( lines[i] );
}
}
Мне интересно, безопасна ли эта схема, не используя drain
событие? Если это не так (что, на мой взгляд, так), каков шаблон для записи произвольных больших данных в файл?
Ответы
Ответ 1
Вот как я, наконец, это сделал. Идея состоит в том, чтобы создать читаемый поток, реализующий интерфейс ReadStream, а затем использовать метод pipe()
для передачи данных в записываемый поток.
var fs = require('fs');
var writeStream = fs.createWriteStream('someFile.txt', { flags : 'w' });
var readStream = new MyReadStream();
readStream.pipe(writeStream);
writeStream.on('close', function () {
console.log('All done!');
});
Пример класса MyReadStream
можно взять из mongoose QueryStream.
Ответ 2
Идея утечки заключается в том, что вы использовали бы ее для тестирования здесь:
var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});
var lines;
while (lines = getLines()) {
for (var i = 0; i < lines.length; i++) {
stream.write(lines[i]); //<-- the place to test
}
}
которым вы не являетесь. Таким образом, вам понадобится перестроить, чтобы сделать его "реентерабельным".
var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});
var lines;
while (lines = getLines()) {
for (var i = 0; i < lines.length; i++) {
var written = stream.write(lines[i]); //<-- the place to test
if (!written){
//do something here to wait till you can safely write again
//this means prepare a buffer and wait till you can come back to finish
// lines[i] -> remainder
}
}
}
Однако, означает ли это, что вам нужно продолжать буферизацию getLines, пока вы ждете?
var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});
var lines,
buffer = {
remainingLines = []
};
while (lines = getLines()) {
for (var i = 0; i < lines.length; i++) {
var written = stream.write(lines[i]); //<-- the place to test
if (!written){
//do something here to wait till you can safely write again
//this means prepare a buffer and wait till you can come back to finish
// lines[i] -> remainder
buffer.remainingLines = lines.slice(i);
break;
//notice there no way to re-run this once we leave here.
}
}
}
stream.on('drain',function(){
if (buffer.remainingLines.length){
for (var i = 0; i < buffer.remainingLines.length; i++) {
var written = stream.write(buffer.remainingLines[i]); //<-- the place to test
if (!written){
//do something here to wait till you can safely write again
//this means prepare a buffer and wait till you can come back to finish
// lines[i] -> remainder
buffer.remainingLines = lines.slice(i);
}
}
}
});
Ответ 3
[Изменить] Обновленные Node.js writable.write(...)
API-документы говорят:
[The] возвращаемое значение строго рекомендательно. Вы МОЖЕТЕ продолжать писать, даже если оно возвращает false. Однако записи будут буферизированы в памяти, поэтому лучше не делать этого чрезмерно. Вместо этого дождитесь события стока, прежде чем записывать больше данных.
[Оригинал] Из stream.write(...)
документации (выделено мной):
Возвращает true
, если строка была сброшена в буфер ядра. Возвращает false
, чтобы указать, что буфер ядра заполнен, и данные будут отправлены в будущем.
Я интерпретирую это как означающее, что функция "write" возвращает true
, если данная строка была немедленно записана в базовый буфер OS или false
, если она еще не была записана, но будет записана функцией записи (например, был предположительно буферизирован для вас с помощью WriteStream), так что вам не нужно снова называть "писать".
Ответ 4
Я обнаружил, что потоки неэффективны для работы с большими файлами - это связано с тем, что вы не можете установить достаточный размер входного буфера (по крайней мере, я не знаю, как это сделать). Это то, что я делаю:
var fs = require('fs');
var i = fs.openSync('input.txt', 'r');
var o = fs.openSync('output.txt', 'w');
var buf = new Buffer(1024 * 1024), len, prev = '';
while(len = fs.readSync(i, buf, 0, buf.length)) {
var a = (prev + buf.toString('ascii', 0, len)).split('\n');
prev = len === buf.length ? '\n' + a.splice(a.length - 1)[0] : '';
var out = '';
a.forEach(function(line) {
if(!line)
return;
// do something with your line here
out += line + '\n';
});
var bout = new Buffer(out, 'ascii');
fs.writeSync(o, bout, 0, bout.length);
}
fs.closeSync(o);
fs.closeSync(i);
Ответ 5
Несколько предложенных ответов на этот вопрос вообще не касались потоков.
Этот модуль может помочь https://www.npmjs.org/package/JSONStream
Однако давайте предположим ситуацию, описанную и напишем код самостоятельно. Вы читаете из MongoDB как поток, с ObjectMode = true по умолчанию.
Это приведет к возникновению проблем, если вы попытаетесь напрямую передать поток в файл - что-то вроде ошибки "Неверный номер строки/буфера".
Решение этого типа проблемы очень просто.
Просто поставьте другое Преобразование между читаемым и записываемым, чтобы соответствующим образом адаптировать Object, читаемый к записи String.
Пример кода:
var fs = require('fs'),
writeStream = fs.createWriteStream('./out' + process.pid, {flags: 'w', encoding: 'utf-8' }),
stream = require('stream'),
stringifier = new stream.Transform();
stringifier._writableState.objectMode = true;
stringifier._transform = function (data, encoding, done) {
this.push(JSON.stringify(data));
this.push('\n');
done();
}
rowFeedDao.getRowFeedsStream(merchantId, jobId)
.pipe(stringifier)
.pipe(writeStream).on('error', function (err) {
// handle error condition
}
Ответ 6
Самый чистый способ справиться с этим - сделать ваш генератор строк читаемым потоком - позвоните ему lineReader
. Затем следующее будет автоматически обрабатывать буферы и эффективно дренировать вас:
lineReader.pipe(fs.createWriteStream('someFile.txt'));
Если вы не хотите создавать читаемый поток, вы можете прослушивать вывод write
для заполнения буфера и отвечать следующим образом:
var i = 0, n = lines.length;
function write () {
if (i === n) return; // A callback could go here to know when it done.
while (stream.write(lines[i++]) && i < n);
stream.once('drain', write);
}
write(); // Initial call.
Более длинный пример этой ситуации можно найти здесь.