Ответ 1
В принципе, как я понимаю, вы ищете возможность создания противодавления в потоке в случае события ошибки. У вас есть несколько вариантов.
Во-первых, как вы уже определили, используйте pipe
, чтобы захватить экземпляр потока чтения и сделать некоторые причудливые работы.
Другой вариант - создать поток записи для записи, который обеспечивает эту функциональность (т.е. он принимает WritableStream
в качестве ввода, а при реализации функций потока передает данные вместе с предоставленным потоком.
В основном вы получаете что-то вроде
source stream -> wrapping writable -> writable
https://nodejs.org/api/stream.html#stream_implementing_a_writable_stream имеет дело с реализацией записываемого потока.
Ключ для вас состоит в том, что если в базовом записываемом файле возникает ошибка, вы должны установить флаг в потоке, а следующий вызов write
произойдет, вы будете буферизовать кусок, сохранить обратный вызов и только позвонить, Что-то вроде
// ...
constructor(wrappedWritableStream) {
wrappedWritableStream.on('error', this.errorHandler);
this.wrappedWritableStream = wrappedWritableStream;
}
// ...
write(chunk, encoding, callback) {
if (this.hadError) {
// Note: until callback is called, this function won't be called again, so we will have maximum one stored
// chunk.
this.bufferedChunk = [chunk, encoding, callback];
} else {
wrappedWritableStream.write(chunk, encoding, callback);
}
}
// ...
errorHandler(err) {
console.error(err);
this.hadError = err;
this.emit(err);
}
// ...
recoverFromError() {
if (this.bufferedChunk) {
wrappedWritableStream.write(...this.bufferedChunk);
this.bufferedChunk = undefined;
}
this.hadError = false;
}
Примечание. Вам нужно только реализовать функцию write
, но я призываю вас копаться и играть с другими функциями реализации.
Также стоит отметить, что у вас могут возникнуть проблемы с потоками, которые испустили событие с ошибкой, но я оставлю это вам как отдельную проблему для решения.
Здесь еще один хороший ресурс по противодавлению https://nodejs.org/en/docs/guides/backpressuring-in-streams/