Ответ 1
Оба Observables и node.js Потоки позволяют решить одну и ту же основную проблему: асинхронно обрабатывать последовательность значений. Основное различие между этими двумя, я считаю, связано с контекстом, который мотивировал его появление. Этот контекст отражен в терминологии и API.
В стороне Observables у вас есть расширение для EcmaScript, которое вводит модель реактивного программирования. Он пытается заполнить пробел между генерированием ценности и асинхронностью с минималистскими и составными понятиями Observer
и Observable
.
В разделе node.js и Streams вы хотели создать интерфейс для асинхронной и эффективной обработки сетевых потоков и локальных файлов. Терминология вытекает из этого начального контекста, и вы получаете pipe
, chunk
, encoding
, flush
, Duplex
, Buffer
и т.д. Имея прагматичный подход, который обеспечивает явную поддержку для конкретных случаев использования, вы теряют способность сочинять вещи, потому что это не так единообразно. Например, вы используете push
в потоке Readable
и write
в Writable
, хотя, по идее, вы делаете то же самое: публикуете значение.
Итак, на практике, если вы посмотрите на концепции, и если вы используете опцию { objectMode: true }
, вы можете сопоставить Observable
с потоком Readable
и Observer
с потоком Writable
. Вы даже можете создать несколько простых адаптеров между двумя моделями.
var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');
var Observable = function(subscriber) {
this.subscribe = subscriber;
}
var Subscription = function(unsubscribe) {
this.unsubscribe = unsubscribe;
}
Observable.fromReadable = function(readable) {
return new Observable(function(observer) {
function nop() {};
var nextFn = observer.next ? observer.next.bind(observer) : nop;
var returnFn = observer.return ? observer.return.bind(observer) : nop;
var throwFn = observer.throw ? observer.throw.bind(observer) : nop;
readable.on('data', nextFn);
readable.on('end', returnFn);
readable.on('error', throwFn);
return new Subscription(function() {
readable.removeListener('data', nextFn);
readable.removeListener('end', returnFn);
readable.removeListener('error', throwFn);
});
});
}
var Observer = function(handlers) {
function nop() {};
this.next = handlers.next || nop;
this.return = handlers.return || nop;
this.throw = handlers.throw || nop;
}
Observer.fromWritable = function(writable, shouldEnd, throwFn) {
return new Observer({
next: writable.write.bind(writable),
return: shouldEnd ? writable.end.bind(writable) : function() {},
throw: throwFn
});
}
Возможно, вы заметили, что я изменил несколько имен и использовал более простые понятия Observer
и Subscription
, введенные здесь, чтобы избежать перегрузки ответственности, выполняемой Observables в Generator
. В принципе, Subscription
позволяет отписаться от него Observable
. Во всяком случае, с приведенным выше кодом вы можете иметь pipe
.
Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));
По сравнению с process.stdin.pipe(process.stdout)
у вас есть способ комбинировать, фильтровать и преобразовывать потоки, которые также работают для любой другой последовательности данных. Вы можете достичь этого с помощью потоков Readable
, Transform
и Writable
, но API поддерживает подклассу вместо привязки Readable
и применения функций. В модели Observable
Например, преобразование значений соответствует применению функции трансформатора к потоку. Он не требует нового подтипа Transform
.
Observable.just = function(/*... arguments*/) {
var values = arguments;
return new Observable(function(observer) {
[].forEach.call(values, function(value) {
observer.next(value);
});
observer.return();
return new Subscription(function() {});
});
};
Observable.prototype.transform = function(transformer) {
var source = this;
return new Observable(function(observer) {
return source.subscribe({
next: function(v) {
observer.next(transformer(v));
},
return: observer.return.bind(observer),
throw: observer.throw.bind(observer)
});
});
};
Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
.subscribe(Observer.fromWritable(process.stdout))
Вывод? Легко ввести реактивную модель и концепцию Observable
в любом месте. Сложнее реализовать целую библиотеку вокруг этой концепции. Все эти небольшие функции должны работать последовательно. В конце концов, проект ReactiveX все еще продолжается. Но если вам действительно нужно отправить содержимое файла клиенту, поработать с кодировкой, а затем закрепить его в поддержке NodeJS, и он работает очень хорошо.