Отложенный рисунок с наблюдаемыми RxJS 5
Для произвольной реализации обещания отложенный шаблон (не путать с antipattern) может выглядеть так:
const deferred = new Deferred;
...
// scopes where `deferred` object reference was passed before promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
...
deferred.resolve(...);
// doesn't affect promise state
deferred.reject();
...
// after promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
Объект
deferred
имеет нерегулярное обещание, которое может быть передано в другие области функций по ссылке. Все цепочки обещаний будут выполняться по договоренности с обещаниями, неважно, было ли deferred.promise
до привязки с then
или после. Состояние обещания не может быть изменено после его урегулирования.
Как следует из ответа, исходные варианты: ReplaySubject
и AsyncSubject
.
Для данной настройки (демо)
var subject = new Rx.AsyncSubject;
var deferred = subject.first();
deferred.subscribe(
console.log.bind(console, 'Early result'),
console.log.bind(console, 'Early error')
);
setTimeout(() => {
deferred.subscribe(
console.log.bind(console, 'Late result'),
console.log.bind(console, 'Late error')
);
});
Это приводит к желаемому поведению:
subject.error('one');
subject.next('two');
Ранняя ошибка
Поздняя ошибка 1
Это приводит к нежелательному поведению:
subject.error('one');
subject.next('two');
subject.complete();
Ранняя ошибка
Поздний результат два
Это приводит к нежелательному поведению:
subject.next('two');
subject.complete();
subject.next('three');
Ранний результат два
Поздний результат три
Результаты от ReplaySubject
отличаются, но все же не соответствуют ожидаемым результатам. next
и error
ошибки обрабатываются отдельно, а complete
не мешает наблюдателям получать новые данные. Это может работать для одиночного next
/error
, проблема в том, что next
или error
можно вызвать несколько раз непреднамеренно.
Причина использования first()
заключается в том, что subscribe
являются одноразовыми подписками, и я хотел бы удалить их, чтобы избежать утечек.
Как это должно быть реализовано с помощью наблюдаемых RxJS?
Ответы
Ответ 1
Вероятно, вы ищете Rx.ReplaySubject(1)
(или Rx.AsyncSubject()
в зависимости от вашего варианта использования).
Более подробное объяснение предметов см. в Что такое семантика разных объектов RxJS?.
В принципе, субъект может передаваться по ссылке, например, отложенной. Вы можете испускать значения (разрешение будет 'next'
(Rxjs v5) или 'onNext'
(Rxjs v4), за которым следуют 'complete'
или 'onCompleted()'
), если вы держите эту ссылку.
У вас может быть любое количество подписчиков на предмет, похожий на then
на отложенный. Если вы используете replaySubject(1)
, все подписчики получат последнее испущенное значение, которое должно ответить на ваш it doesn't matter if deferred.promise was settled before chaining with then or after.
. В Rxjs v4 a replaySubject
будет передавать свое последнее значение подписчику подписчика после его завершения. Я не уверен в поведении в Rxjs v5.
Update
Следующий код, выполненный с помощью Rxjs v4:
var subject = new Rx.AsyncSubject();
var deferred = subject;
deferred.subscribe(
console.log.bind(console, 'First result'),
console.log.bind(console, 'First error')
);
setTimeout(() => {
deferred.subscribe(
console.log.bind(console, 'Second result'),
console.log.bind(console, 'Second error')
);
});
subject.onNext('one');
subject.onCompleted();
subject.onNext('two');
subject.onNext('three');
subject.onNext('four');
выводит следующий результат:
First result one
Second result one
Однако тот же код, выполненный с Rxjs v5 не:
First result one
Second result four
Итак, в основном это означает, что семантика предметов изменилась в Rxjs v5!!! Это действительно потрясающее изменение, о котором нужно знать. В любом случае, вы можете подумать о возвращении к Rxjs v4 или использовать поворот, предложенный артуром grzesiak в его ответе. Вы также можете указать проблему на сайте github. Я бы поверила, что это изменение намеренно, но в том случае, когда это не так, подача вопроса может помочь прояснить ситуацию. В любом случае, независимо от того, какое поведение выбрано, необходимо обязательно задокументировать.
В вопросе о семантике предметов есть ссылка, показывающая асинхронный объект по отношению к multiple и поздняя подписка
Ответ 2
Поскольку @user3743222 написал AsyncSubject
, возможно, использовался в реализации deferred
, но дело в том, что оно должно быть private
и защищено от нескольких resolve
/reject
s.
Ниже приведена возможная структура зеркалирования реализации resolve-reject-promise
:
const createDeferred = () => {
const pending = new Rx.AsyncSubject(); // caches last value / error
const end = (result) => {
if (pending.isStopped) {
console.warn('Deferred already resloved/rejected.'); // optionally throw
return;
}
if (result.isValue) {
pending.next(result.value);
pending.complete();
} else {
pending.error(result.error);
}
}
return {
resolve: (value) => end({isValue: true, value: value }),
reject: (error) => end({isValue: false, error: error }),
observable: pending.asObservable() // hide subject
};
}
// sync example
let def = createDeferred();
let obs = def.observable;
obs.subscribe(n => console.log('BEFORE-RESOLVE'));
def.resolve(1);
def.resolve(2); // warn - no action
def.reject('ERROR') // warn - no action
def.observable.subscribe(n => console.log('AFTER-RESOLVE'));
// async example
def = createDeferred();
def.observable.subscribe(() => console.log('ASYNC-BEFORE-RESOLVE'));
setTimeout(() => {
def.resolve(1);
setTimeout(() => {
def.observable.subscribe(() => console.log('ASYNC-AFTER-RESOLVE'));
def.resolve(2); // warn
def.reject('err'); // warn
}, 1000)
}, 1000);
// async error example
const def3 = createDeferred();
def3.observable.subscribe(
(n) => console.log(n, 'ERROR-BEFORE-REJECTED (I will not be called)'),
(err) => console.error('ERROR-BEFORE-REJECTED', err));
setTimeout(() => {
def3.reject('ERR');
setTimeout(() => {
def3.observable.subscribe(
(n) => console.log(n, 'ERROR-AFTER-REJECTED (I will not be called)'),
(err) => console.error('ERROR-AFTER-REJECTED', err));
def3.resolve(2); // warn
def3.reject('err'); // warn
}, 1000)
}, 3000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.9/Rx.umd.js"></script>