rxjs 5 publishReplay refCount
Я не могу понять, как publishReplay().refCount()
.
Например (https://jsfiddle.net/7o3a45L1/):
var source = Rx.Observable.create(observer => {
console.log("call");
// expensive http request
observer.next(5);
}).publishReplay().refCount();
subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)});
subscription1.unsubscribe();
console.log("");
subscription2 = source.subscribe({next: (v) => console.log('observerB: ' + v)});
subscription2.unsubscribe();
console.log("");
subscription3 = source.subscribe({next: (v) => console.log('observerC: ' + v)});
subscription3.unsubscribe();
console.log("");
subscription4 = source.subscribe({next: (v) => console.log('observerD: ' + v)});
subscription4.unsubscribe();
дает следующий результат:
call observerA: 5
observerB: 5 вызов observerB: 5
observerC: 5 observerC: 5 call observerC: 5
observerD: 5 observerD: 5 observerD: 5 call observerD: 5
1) Почему observerB, C и D называются многократно?
2) Почему "вызов" печатается на каждой строке, а не в начале строки?
Кроме того, если я вызываю publishReplay(1).refCount()
, он вызывает observerB, C и D 2 раза каждый.
Я ожидаю, что каждый новый наблюдатель получает значение 5 ровно один раз, а "вызов" печатается только один раз.
Ответы
Ответ 1
publishReplay(x).refCount()
вместе выполняет следующее:
- Он создает
ReplaySubject
который воспроизводит до х выбросов. Если x не определен, он повторяет полный поток. - Он делает эту многоадресную
ReplaySubject
совместимой с помощью оператора refCount(). Это приводит к одновременным подписям, получающим одни и те же выбросы.
В вашем примере содержится несколько проблем, помутняющих, как все это работает. См. Следующий отредактированный фрагмент:
var state = 5
var realSource = Rx.Observable.create(observer => {
console.log("creating expensive HTTP-based emission");
observer.next(state++);
// observer.complete();
return () => {
console.log('unsubscribing from source')
}
});
var source = Rx.Observable.of('')
.do(() => console.log('stream subscribed'))
.ignoreElements()
.concat(realSource)
.do(null, null, () => console.log('stream completed'))
.publishReplay()
.refCount()
;
subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)});
subscription1.unsubscribe();
subscription2 = source.subscribe(v => console.log('observerB: ' + v));
subscription2.unsubscribe();
subscription3 = source.subscribe(v => console.log('observerC: ' + v));
subscription3.unsubscribe();
subscription4 = source.subscribe(v => console.log('observerD: ' + v));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>
Ответ 2
Как правило: refCount
означает, что поток горячий/общий, если есть хотя бы один абонент, однако он перезагружается/холоден, когда нет подписчиков.
Это означает, что если вы хотите быть абсолютно уверены, что ничего не выполняется более одного раза, вы не должны использовать refCount()
а просто connect
поток, чтобы установить его горячим.
В качестве дополнительной заметки: если вы добавите observer.complete()
после observer.next(5);
вы также получите ожидаемый результат.
Sidenote: Вам действительно нужно создать свой собственный пользовательский Obervable
здесь? В 95% случаев существующие операторы достаточны для данной usecase.
Ответ 3
Это происходит потому, что вы используете publishReplay()
. Он внутренне создает экземпляр ReplaySubject
который хранит все значения, которые проходят.
Поскольку вы используете Observable.create
где вы испускаете одно значение, то каждый раз, когда вы вызываете source.subscribe(...)
вы добавляете одно значение в буфер в ReplaySubject
.
Вы не получаете call
напечатанный в начале каждой строки, потому что он ReplaySubject
который ReplaySubject
выдает свой буфер, когда вы подписываетесь, а затем подписывается на свой источник:
Подробности реализации см. В:
То же самое относится к использованию publishReplay(1)
. Сначала он испускает буферный элемент из ReplaySubject
а затем еще один элемент из observer.next(5);