Node.js Проводка одного и того же читаемого потока на несколько (записываемых) целей
Мне нужно запустить сразу две команды, которые должны читать данные из одного потока.
После подачи потока в другой буфер освобождается, поэтому я не могу читать данные из этого потока снова, поэтому это не работает:
var spawn = require('child_process').spawn;
var fs = require('fs');
var request = require('request');
var inputStream = request('http://placehold.it/640x360');
var identify = spawn('identify',['-']);
inputStream.pipe(identify.stdin);
var chunks = [];
identify.stdout.on('data',function(chunk) {
chunks.push(chunk);
});
identify.stdout.on('end',function() {
var size = getSize(Buffer.concat(chunks)); //width
var convert = spawn('convert',['-','-scale',size * 0.5,'png:-']);
inputStream.pipe(convert.stdin);
convert.stdout.pipe(fs.createWriteStream('half.png'));
});
function getSize(buffer){
return parseInt(buffer.toString().split(' ')[2].split('x')[0]);
}
Запрос жалуется на это
Error: You cannot pipe after data has been emitted from the response.
и изменение inputStream на fs.createWriteStream
дает ту же проблему, конечно.
Я не хочу записывать в файл, но каким-то образом повторно использовать поток, создаваемый request (или любой другой).
Есть ли способ повторно использовать читаемый поток после завершения конвейера?
Какой был бы лучший способ выполнить что-то вроде приведенного выше примера?
Ответы
Ответ 1
Вы не можете повторно использовать переданные данные, которые уже отправлены. И вы не можете передать поток после своего "конца". Таким образом, вы не можете обрабатывать тот же поток дважды, и вам нужны два потока. Вы должны создать дубликат потока, подключив его к двум потокам. Вы можете создать простой поток с потоком PassThrough, он просто передает ввод на вывод.
spawn = require('child_process').spawn;
pass = require('stream').PassThrough;
a = spawn('echo', ['hi user']);
b = new pass;
c = new pass;
a.stdout.pipe(b);
a.stdout.pipe(c);
count = 0;
b.on('data', function(chunk) { count += chunk.length; });
b.on('end', function() { console.log(count); c.pipe(process.stdout); });
Выход
8
hi user
Ответ 2
Первый ответ работает только в том случае, если потоки занимают примерно одинаковое количество времени для обработки данных. Если кто-то занимает значительно больше времени, тем быстрее запрашиваются новые данные, следовательно, перезаписываются данные, все еще используемые более медленным (у меня была эта проблема после попытки ее решить с использованием дублированного потока).
Следующий шаблон работал очень хорошо для меня. Он использует библиотеку на основе потоков Stream2, Streamz и Promises для синхронизации потоков async посредством обратного вызова. Используя знакомый пример из первого ответа:
spawn = require('child_process').spawn;
pass = require('stream').PassThrough;
streamz = require('streamz').PassThrough;
var Promise = require('bluebird');
a = spawn('echo', ['hi user']);
b = new pass;
c = new pass;
a.stdout.pipe(streamz(combineStreamOperations));
function combineStreamOperations(data, next){
Promise.join(b, c, function(b, c){ //perform n operations on the same data
next(); //request more
}
count = 0;
b.on('data', function(chunk) { count += chunk.length; });
b.on('end', function() { console.log(count); c.pipe(process.stdout); });
Ответ 3
Для общей проблемы следующий код работает отлично
var PassThrough = require('stream').PassThrough
a=PassThrough()
b1=PassThrough()
b2=PassThrough()
a.pipe(b1)
a.pipe(b2)
b1.on('data', function(data) {
console.log('b1:', data.toString())
})
b2.on('data', function(data) {
console.log('b2:', data.toString())
})
a.write('text')
Ответ 4
Как насчет трубопроводов в два или более потока не в одно и то же время?
Например:
var PassThrough = require('stream').PassThrough;
var mybiraryStream = stream.start(); //never ending audio stream
var file1 = fs.createWriteStream('file1.wav',{encoding:'binary'})
var file2 = fs.createWriteStream('file2.wav',{encoding:'binary'})
var mypass = PassThrough
mybinaryStream.pipe(mypass)
mypass.pipe(file1)
setTimeout(function(){
mypass.pipe(file2);
},2000)
Приведенный выше код не вызывает ошибок, но файл2 пуст
Ответ 5
У меня есть другое решение для записи в два потока одновременно, естественно, время для записи будет добавлением двух раз, но я использую его для ответа на запрос загрузки, где я хочу сохранить копию загруженный файл на моем сервере (на самом деле я использую резервную копию S3, поэтому я кэширую наиболее используемые файлы локально, чтобы избежать множественной передачи файлов)
/**
* A utility class made to write to a file while answering a file download request
*/
class TwoOutputStreams {
constructor(streamOne, streamTwo) {
this.streamOne = streamOne
this.streamTwo = streamTwo
}
setHeader(header, value) {
if (this.streamOne.setHeader)
this.streamOne.setHeader(header, value)
if (this.streamTwo.setHeader)
this.streamTwo.setHeader(header, value)
}
write(chunk) {
this.streamOne.write(chunk)
this.streamTwo.write(chunk)
}
end() {
this.streamOne.end()
this.streamTwo.end()
}
}
Затем вы можете использовать это как обычный OutputStream
const twoStreamsOut = new TwoOutputStreams(fileOut, responseStream)
и передать его вашему методу, как если бы это был ответ или файлOutputStream