Как я могу сделать одну последовательность наблюдений RxJS ожидающей, пока кто-то завершит ее, прежде чем исправить?
Скажем, у меня есть Наблюдаемый, например:
var one = someObservable.take(1);
one.subscribe(function(){ /* do something */ });
Тогда у меня есть вторая наблюдаемая:
var two = someOtherObservable.take(1);
Теперь я хочу подписаться на two
, но я хочу убедиться, что one
завершено до того, как абонент two
уволен. Какой метод буферизации я могу использовать на two
, чтобы второй ожидал завершения первого?
Я предполагаю, что я хочу приостановить two
до завершения one
.
Ответы
Ответ 1
Несколько способов, которые я могу придумать
//Method one
var one = someObservable.take(1);
var two = someOtherObservable.take(1);
one.concat(two).subscribe(function() {/*do something */});
//Method two, if they need to be separate for some reason
var one = someObservable.take(1);
var two = someOtherObservable.take(1).publish();
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
Ответ 2
skipUntil() с последним()
skipUntil: игнорировать испускаемые элементы, пока другая наблюдаемая не испустит
last: выдать последнее значение из последовательности (т.е. подождать, пока оно не завершится, затем произвести)
Обратите внимание, что все, что skipUntil
из наблюдаемой, переданной в skipUntil
, отменяет пропуск, поэтому нам нужно добавить last()
- дождаться завершения потока.
main$.skipUntil(sequence2$.pipe(last()))
Официально: https://rxjs-dev.firebaseapp.com/api/operators/skipUntil
Возможная проблема: обратите внимание, что last()
сама по себе выдаст ошибку, если ничего не генерируется. Оператор last()
имеет параметр по default
но только при использовании вместе с предикатом. Я думаю, если эта ситуация является проблемой для вас (если sequence2$
может завершиться без выдачи), то один из них должен работать (в настоящее время не проверено):
main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))
Обратите внимание, что undefined
является допустимым элементом для отправки, но на самом деле может иметь любое значение. Также обратите внимание, что это канал, присоединенный к sequence2$
а не main$
pipe.
Ответ 3
Если вы хотите убедиться, что порядок выполнения сохранен, вы можете использовать flatMap в следующем примере
const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));
first
.flatMap(() => second)
.flatMap(() => third)
.subscribe(()=> console.log('finished'));
Результат:
"1"
"11"
"111"
"finished"
Ответ 4
Если вторая наблюдаемая горячая, есть другой способ сделать паузу/возобновление:
var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);
source1.doOnCompleted(function () {
/* resume paused source2 */
pauser.onNext(true);
}).subscribe(function(){
// do something
});
source2.subscribe(function(){
// start to recieve data
});
Также вы можете использовать буферизованную версию pausableBuffered для сохранения данных во время паузы.
Ответ 5
Вот еще одна возможность воспользоваться селектором результатов switchMap
var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
/** Wait for first Observable */
() => one$,
/** Only return the value we're actually interested in */
(value2, value1) => value2
)
.subscribe((value2) => {
/* do something */
});
Поскольку селектор результатов switchMap устарел, вот обновленная версия
const one$ = someObservable.pipe(take(1));
const two$ = someOtherObservable.pipe(
take(1),
switchMap(value2 => one$.map(_ => value2))
);
two$.subscribe(value2 => {
/* do something */
});
Ответ 6
Здесь еще один, но я чувствую более прямой и интуитивный (или, по крайней мере, естественный, если вы привыкли к Обещаниям) подход. По сути, вы создаете Observable, используя Observable.create()
чтобы обернуть one
и two
в один Observable. Это очень похоже на то, как может работать Promise.all()
.
var first = someObservable.take(1);
var second = Observable.create((observer) => {
return first.subscribe(
function onNext(value) {
/* do something with value like: */
// observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
someOtherObservable.take(1).subscribe(
function onNext(value) {
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
observer.complete();
}
);
}
);
});
Итак, что здесь происходит? Сначала мы создаем новый Observable. Функция, передаваемая в Observable.create()
, метко названная onSubscription
, передается наблюдателю (построенному из параметров, которые вы передаете в subscribe()
), что аналогично resolve
и reject
объединенным в один объект при создании нового Promise. Вот как мы заставляем магию работать.
В onSubscription
мы подписываемся на первый Observable (в приведенном выше примере он назывался one
). Как мы поступим next
и error
будет зависеть от вас, но, по большому счету, значение по умолчанию в моем образце должно быть уместным. Однако, когда мы получим complete
событие, что означает, что one
уже выполнено, мы можем подписаться на следующее наблюдаемое; тем самым запуская вторую Наблюдаемую после завершения первой.
Пример наблюдателя для второго Observable довольно прост. По сути, second
теперь действует так, как вы ожидаете, что two
будут действовать как в ОП. Более конкретно, second
будет выдавать первое и единственное первое значение, испускаемое someOtherObservable
(из-за take(1)
), а затем завершаться, предполагая, что ошибки нет.
пример
Вот полный рабочий пример, который вы можете скопировать/вставить, если хотите, чтобы мой пример работал в реальной жизни:
var someObservable = Observable.from([1, 2, 3, 4, 5]);
var someOtherObservable = Observable.from([6, 7, 8, 9]);
var first = someObservable.take(1);
var second = Observable.create((observer) => {
return first.subscribe(
function onNext(value) {
/* do something with value like: */
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
someOtherObservable.take(1).subscribe(
function onNext(value) {
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
observer.complete();
}
);
}
);
}).subscribe(
function onNext(value) {
console.log(value);
},
function onError(error) {
console.error(error);
},
function onComplete() {
console.log("Done!");
}
);
Если вы смотрите консоль, приведенный выше пример напечатает:
1
6
Готово!
Ответ 7
Вот способ многократного использования (это машинопись, но вы можете адаптировать ее к js):
export function waitFor<T>(signal: Observable<any>) {
return (source: Observable<T>) =>
new Observable<T>(observer =>
signal.pipe(first())
.subscribe(_ =>
source.subscribe(observer)
)
);
}
и вы можете использовать его как любой оператор:
var two = someOtherObservable.pipe(waitFor(one), take(1));
Это в основном оператор, который откладывает подписку на наблюдаемый источник, пока наблюдаемый сигнал не испустит первое событие.
Ответ 8
Вы можете использовать результат, полученный от предыдущего Observable благодаря оператору mergeMap (или его псевдоним flatMap), например:
const one = Observable.of('https://api.github.com/users');
const two = (c) => ajax(c);//ajax from Rxjs/dom library
one.mergeMap(two).subscribe(c => console.log(c))
Ответ 9
Вот довольно интуитивная и простая реализация в RxSwift, которая должна быть легко переведена в RxJS:
Observable.never().takeUntil(signalingObservable).concat(delayedObservable)
Ответ 10
ну, я знаю, что это довольно старый, но я думаю, что вам может понадобиться:
var one = someObservable.take(1);
var two = someOtherObservable.pipe(
concatMap((twoRes) => one.pipe(mapTo(twoRes))),
take(1)
).subscribe((twoRes) => {
// one is completed and we get two subscription.
})