Реализация потока буферизованного преобразования
Я пытаюсь реализовать поток с новым Node.js потоком API, который будет буферизовать определенный объем данных. Когда этот поток передается по каналу в другой поток или если что-то потребляет события readable
, этот поток должен очищать свой буфер, а затем просто переходить через проход. Ловушка состоит в том, что этот поток будет передан по каналам во многие другие потоки, и когда каждый целевой поток будет подключен, буфер должен быть сброшен, даже если он уже сброшен в другой поток.
Например:
-
BufferStream
реализует stream.Transform
и сохраняет внутренний кольцевой буфер 512 КБ.
-
ReadableStreamA
передается по каналу в экземпляр BufferStream
-
BufferStream
записывает в свой кольцевой буфер, считывая данные из ReadableStreamA
по мере его поступления. (Не имеет значения, потеряны ли данные, поскольку буфер перезаписывает старые данные.)
-
BufferStream
подается на WritableStreamB
-
WritableStreamB
получает весь буфер 512 Кбайт и продолжает получать данные, поскольку он написан от ReadableStreamA
до BufferStream
.
-
BufferStream
подается на WritableStreamC
-
WritableStreamC
также получает весь буфер 512 Кбайт, но этот буфер теперь отличается от того, что получил WritableStreamB
, поскольку с тех пор больше данных было записано в BufferStream
.
Возможно ли это с помощью API потоков? Единственный метод, о котором я могу думать, - создать объект с помощью метода, который запустит новый поток PassThrough для каждого пункта назначения, то есть я не мог бы просто подключиться к нему и из него.
Для чего это стоит, я сделал это со старым "текущим" API, просто слушая новые обработчики событий data
. Когда новая функция была присоединена с помощью .on('data')
, я бы назвал ее напрямую с копией кольцевого буфера.
Ответы
Ответ 1
Вот мой вопрос по вашей проблеме.
Основная идея - создать поток Transform
, который позволит нам выполнить вашу собственную логику буферизации перед отправкой данных на выходе потока:
var util = require('util')
var stream = require('stream')
var BufferStream = function (streamOptions) {
stream.Transform.call(this, streamOptions)
this.buffer = new Buffer('')
}
util.inherits(BufferStream, stream.Transform)
BufferStream.prototype._transform = function (chunk, encoding, done) {
// custom buffering logic
// ie. add chunk to this.buffer, check buffer size, etc.
this.buffer = new Buffer(chunk)
this.push(chunk)
done()
}
Затем нам нужно переопределить метод .pipe()
, чтобы мы были уведомлены, когда BufferStream
передается в поток, что позволяет нам автоматически записывать в него данные:
BufferStream.prototype.pipe = function (destination, options) {
var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
res.write(this.buffer)
return res
}
Таким образом, когда мы пишем buffer.pipe(someStream)
, мы выполняем канал по назначению и записываем внутренний буфер в выходной поток. После этого класс Transform
заботится обо всем, отслеживая противодавление и многое другое.
Вот рабочий принцип. Обратите внимание, что я не стал писать правильную логику буферизации (т.е. Мне не нужен размер внутреннего буфера), но это должно быть легко исправить.
Ответ 2
Пол отвечает хорошо, но я не думаю, что он соответствует конкретным требованиям. Похоже, что должно произойти то, что каждый раз, когда pipe() вызывается в этом потоке преобразования, ему нужно сначала сбросить буфер, который представляет все накопление данных между временем, когда был создан поток преобразования/(подключен к потоку источника) и время, когда оно было подключено к текущему потоку записи/назначения.
Что-то вроде этого может быть более правильным:
var BufferStream = function () {
stream.Transform.apply(this, arguments);
this.buffer = []; //I guess an array will do
};
util.inherits(BufferStream, stream.Transform);
BufferStream.prototype._transform = function (chunk, encoding, done) {
this.push(chunk ? String(chunk) : null);
this.buffer.push(chunk ? String(chunk) : null);
done()
};
BufferStream.prototype.pipe = function (destination, options) {
var res = BufferStream.super_.prototype.pipe.apply(this, arguments);
this.buffer.forEach(function (b) {
res.write(String(b));
});
return res;
};
return new BufferStream();
Я полагаю, что это:
BufferStream.super_.prototype.pipe.apply(this, arguments);
эквивалентно этому:
stream.Transform.prototype.pipe.apply(this, arguments);
Вы могли бы оптимизировать это и использовать некоторые флаги при вызове pipe/unpipe.