Каков правильный способ приостановить чтение прочитанного в потоковом потоке потока из записи в nodejs?

Я пишу модуль, который является записываемым потоком. Я хочу реализовать интерфейс pipe для своих пользователей.

Если возникает некоторая ошибка, мне нужно приостановить чтение и испустить ошибку. Затем пользователь решит - если он в порядке с ошибкой, он должен иметь возможность возобновить обработку данных.

var writeable = new BackPressureStream();
writeable.on('error', function(error){
    console.log(error);
    writeable.resume();
});

var readable = require('fs').createReadStream('somefile.txt');
readable.pipe.(writeable);

Я вижу, что node предоставляет нам метод readable.pause(), который можно использовать для приостановки чтения. Но я не могу получить, как я могу назвать это из моего записываемого модуля потока:

var Writable = require('stream').Writable;

function BackPressureStream(options) {
    Writable.call(this, options);
}
require('util').inherits(BackPressureStream, Writable);

BackPressureStream.prototype._write = function(chunk, encoding, done) {
    done();
};

BackPressureStream.prototype.resume = function() {
    this.emit('drain');
}

Как противодавление может быть реализовано в записываемом потоке?

P.S. Можно использовать события pipe/unpipe, которые предоставляют читаемый поток в качестве параметра. Но также говорится, что для потоков с каналами единственный шанс приостановить - это освободить читаемый поток от записи.

У меня все получилось? Я должен отключить мой записываемый поток, пока пользователь не назовет возобновление? И после того, как пользователь возобновит работу, я должен отправить отчетный поток обратно?

Ответы

Ответ 1

В принципе, как я понимаю, вы ищете возможность создания противодавления в потоке в случае события ошибки. У вас есть несколько вариантов.

Во-первых, как вы уже определили, используйте pipe, чтобы захватить экземпляр потока чтения и сделать некоторые причудливые работы.

Другой вариант - создать поток записи для записи, который обеспечивает эту функциональность (т.е. он принимает WritableStream в качестве ввода, а при реализации функций потока передает данные вместе с предоставленным потоком.

В основном вы получаете что-то вроде

source stream -> wrapping writable -> writable

https://nodejs.org/api/stream.html#stream_implementing_a_writable_stream имеет дело с реализацией записываемого потока.

Ключ для вас состоит в том, что если в базовом записываемом файле возникает ошибка, вы должны установить флаг в потоке, а следующий вызов write произойдет, вы будете буферизовать кусок, сохранить обратный вызов и только позвонить, Что-то вроде

// ...
constructor(wrappedWritableStream) {
    wrappedWritableStream.on('error', this.errorHandler);
    this.wrappedWritableStream = wrappedWritableStream;
}
// ...
write(chunk, encoding, callback) {
    if (this.hadError) {
        // Note: until callback is called, this function won't be called again, so we will have maximum one stored
        //  chunk.
        this.bufferedChunk = [chunk, encoding, callback];
    } else {
        wrappedWritableStream.write(chunk, encoding, callback);
    }
}
// ...
errorHandler(err) {
    console.error(err);
    this.hadError = err;
    this.emit(err);
}
// ...
recoverFromError() {
    if (this.bufferedChunk) {
        wrappedWritableStream.write(...this.bufferedChunk);
        this.bufferedChunk = undefined;
    }
    this.hadError = false;
}

Примечание. Вам нужно только реализовать функцию write, но я призываю вас копаться и играть с другими функциями реализации.

Также стоит отметить, что у вас могут возникнуть проблемы с потоками, которые испустили событие с ошибкой, но я оставлю это вам как отдельную проблему для решения.

Здесь еще один хороший ресурс по противодавлению https://nodejs.org/en/docs/guides/backpressuring-in-streams/