Как использовать событие стока потока. Записывается в Node.js
В Node.js Я использую метод fs.createWriteStream
для добавления данных в локальный файл. В документации Node они упоминают событие drain
при использовании fs.createWriteStream
, но я этого не понимаю.
var stream = fs.createWriteStream('fileName.txt');
var result = stream.write(data);
В приведенном выше коде, как я могу использовать событие стока? Правильно ли это событие используется?
var data = 'this is my data';
if (!streamExists) {
var stream = fs.createWriteStream('fileName.txt');
}
var result = stream.write(data);
if (!result) {
stream.once('drain', function() {
stream.write(data);
});
}
Ответы
Ответ 1
Событие drain
- это когда освобожден внутренний буфер записи для записи.
Это может произойти только в том случае, когда размер внутреннего буфера превысил его свойство highWaterMark
, которое является максимальным байтом данных, который может быть сохранен внутри внутреннего буфера для записи, до тех пор, пока он не перестанет считываться из источника данных.
Причиной чего-то подобного может быть из-за настроек, которые связаны с чтением источника данных из одного потока быстрее, чем его можно записать на другой ресурс. Например, возьмите два потока:
var fs = require('fs');
var read = fs.createReadStream('./read');
var write = fs.createWriteStream('./write');
Теперь представьте, что файл read
находится на SSD и может читать со скоростью 500 МБ/с и write
находится на жестком диске, который может записываться только в 150MB/s
. Поток записи не сможет идти в ногу и начнет хранить данные во внутреннем буфере. Как только буфер достигнет highWaterMark
, который по умолчанию составляет 16 КБ, записи начнут возвращать false
, и поток будет внутренне стоять в очереди на утечку. После того, как длина внутреннего буфера равна 0, произойдет событие drain
.
Вот как работает дренаж:
if (state.length === 0 && state.needDrain) {
state.needDrain = false;
stream.emit('drain');
}
И это предпосылки для стока, которые являются частью функции writeOrBuffer
:
var ret = state.length < state.highWaterMark;
state.needDrain = !ret;
Чтобы узнать, как используется событие drain
, возьмите пример из документации Node.js.
function writeOneMillionTimes(writer, data, encoding, callback) {
var i = 1000000;
write();
function write() {
var ok = true;
do {
i -= 1;
if (i === 0) {
// last time!
writer.write(data, encoding, callback);
} else {
// see if we should continue, or wait
// don't pass the callback, because we're not done yet.
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// had to stop early!
// write some more once it drains
writer.once('drain', write);
}
}
}
Задача функции - записать 1000 000 раз в записываемый поток. Случается, что переменная ok
установлена в true, и цикл выполняется только тогда, когда ok
является истинным. Для каждой итерации цикла значение ok
устанавливается в значение stream.write()
, которое вернет false, если требуется drain
. Если ok
становится ложным, тогда обработчик события для drain
ждет и загорается, возобновляет запись.
Что касается вашего кода, вам не нужно использовать событие drain
, потому что вы пишете только один раз сразу после открытия вашего потока. Поскольку вы еще ничего не писали в потоке, внутренний буфер пуст, и вам нужно будет писать не менее 16 КБ в кусках, чтобы событие drain
срабатывало. Событие drain
предназначено для записи много раз с большим количеством данных, чем параметр highWaterMark
вашего записываемого потока.
Ответ 2
Представьте, что вы подключаете 2 потока с очень разной пропускной способностью, скажем, для загрузки локального файла на медленный сервер. (Быстрый) поток файлов будет генерировать данные быстрее, чем (медленный) поток сокета может его потреблять.
В этой ситуации node.js будет хранить данные в памяти до тех пор, пока медленный поток не получит возможность обработать его. Это может стать проблематичным, если файл очень большой.
Чтобы избежать этого, Stream.write
возвращает false
, когда базовый системный буфер заполнен. Если вы перестанете писать, поток позже выведет событие drain
, чтобы указать, что системный буфер опустел, и повторите запись.
Вы можете использовать pause/resume
читаемый поток и управлять пропускной способностью читаемого потока.
Лучше: вы можете использовать readable.pipe(writable)
, который сделает это для вас.
EDIT. В вашем коде есть ошибка: независимо от того, что возвращает write
, ваши данные были написаны. Вам не нужно повторять это. В вашем случае вы пишете data
дважды.
Что-то вроде этого будет работать:
var packets = […],
current = -1;
function niceWrite() {
current += 1;
if (current === packets.length)
return stream.end();
var nextPacket = packets[current],
canContinue = stream.write(nextPacket);
// wait until stream drains to continue
if (!canContinue)
stream.once('drain', niceWrite);
else
niceWrite();
}
Ответ 3
Вот версия с async/await
const write = (writer, data) => {
return new Promise((resolve) => {
if (!writer.write(data)) {
writer.once('drain', resolve)
}
else {
resolve()
}
})
}
// usage
const run = async () => {
const write_stream = fs.createWriteStream('...')
const max = 1000000
let current = 0
while (current <= max) {
await write(write_stream, current++)
}
}
https://gist.github.com/stevenkaspar/509f792cbf1194f9fb05e7d60a1fbc73
Ответ 4
Это оптимизированная по скорости версия с использованием Promises (async/await). Вызывающий должен проверить, получает ли он promise
обратно, и только в этом случае должен быть вызван await
. Ожидание при каждом вызове может замедлить программу в 3 раза...
const write = (writer, data) => {
// return a promise only when we get a drain
if (!writer.write(data)) {
return new Promise((resolve) => {
writer.once('drain', resolve)
})
}
}
// usage
const run = async () => {
const write_stream = fs.createWriteStream('...')
const max = 1000000
let current = 0
while (current <= max) {
const promise = write(write_stream, current++)
// since drain happens rarely, awaiting each write call is really slow.
if (promise) {
// we got a drain event, therefore we wait
await promise
}
}
}