Как я могу сделать одну последовательность наблюдений RxJS ожидающей, пока кто-то завершит ее, прежде чем исправить?

Скажем, у меня есть Наблюдаемый, например:

var one = someObservable.take(1);

one.subscribe(function(){ /* do something */ });

Тогда у меня есть вторая наблюдаемая:

var two = someOtherObservable.take(1);

Теперь я хочу подписаться на two, но я хочу убедиться, что one завершено до того, как абонент two уволен. Какой метод буферизации я могу использовать на two, чтобы второй ожидал завершения первого?

Я предполагаю, что я хочу приостановить two до завершения one.

Ответы

Ответ 1

Несколько способов, которые я могу придумать

//Method one
var one = someObservable.take(1);
var two = someOtherObservable.take(1);
one.concat(two).subscribe(function() {/*do something */});

//Method two, if they need to be separate for some reason
var one = someObservable.take(1);
var two = someOtherObservable.take(1).publish();
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));

Ответ 2

skipUntil() с последним()

skipUntil: игнорировать испускаемые элементы, пока другая наблюдаемая не испустит

last: выдать последнее значение из последовательности (т.е. подождать, пока оно не завершится, затем произвести)

Обратите внимание, что все, что skipUntil из наблюдаемой, переданной в skipUntil, отменяет пропуск, поэтому нам нужно добавить last() - дождаться завершения потока.

main$.skipUntil(sequence2$.pipe(last()))

Официально: https://rxjs-dev.firebaseapp.com/api/operators/skipUntil


Возможная проблема: обратите внимание, что last() сама по себе выдаст ошибку, если ничего не генерируется. Оператор last() имеет параметр по default но только при использовании вместе с предикатом. Я думаю, если эта ситуация является проблемой для вас (если sequence2$ может завершиться без выдачи), то один из них должен работать (в настоящее время не проверено):

main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))

Обратите внимание, что undefined является допустимым элементом для отправки, но на самом деле может иметь любое значение. Также обратите внимание, что это канал, присоединенный к sequence2$ а не main$ pipe.

Ответ 3

Если вы хотите убедиться, что порядок выполнения сохранен, вы можете использовать flatMap в следующем примере

const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));

first
  .flatMap(() => second)
  .flatMap(() => third)
  .subscribe(()=> console.log('finished'));

Результат:

"1"
"11"
"111"
"finished"

Ответ 4

Если вторая наблюдаемая горячая, есть другой способ сделать паузу/возобновление:

var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);

source1.doOnCompleted(function () { 
  /* resume paused source2 */ 
  pauser.onNext(true);
}).subscribe(function(){
  // do something
});

source2.subscribe(function(){
  // start to recieve data 
});

Также вы можете использовать буферизованную версию pausableBuffered для сохранения данных во время паузы.

Ответ 5

Вот еще одна возможность воспользоваться селектором результатов switchMap

var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
    /** Wait for first Observable */
    () => one$,
    /** Only return the value we're actually interested in */
    (value2, value1) => value2
  )
  .subscribe((value2) => {
    /* do something */ 
  });

Поскольку селектор результатов switchMap устарел, вот обновленная версия

const one$ = someObservable.pipe(take(1));
const two$ = someOtherObservable.pipe(
  take(1),
  switchMap(value2 => one$.map(_ => value2))
);
two$.subscribe(value2 => {
  /* do something */ 
});

Ответ 6

Здесь еще один, но я чувствую более прямой и интуитивный (или, по крайней мере, естественный, если вы привыкли к Обещаниям) подход. По сути, вы создаете Observable, используя Observable.create() чтобы обернуть one и two в один Observable. Это очень похоже на то, как может работать Promise.all().

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      // observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
});

Итак, что здесь происходит? Сначала мы создаем новый Observable. Функция, передаваемая в Observable.create(), метко названная onSubscription, передается наблюдателю (построенному из параметров, которые вы передаете в subscribe()), что аналогично resolve и reject объединенным в один объект при создании нового Promise. Вот как мы заставляем магию работать.

В onSubscription мы подписываемся на первый Observable (в приведенном выше примере он назывался one). Как мы поступим next и error будет зависеть от вас, но, по большому счету, значение по умолчанию в моем образце должно быть уместным. Однако, когда мы получим complete событие, что означает, что one уже выполнено, мы можем подписаться на следующее наблюдаемое; тем самым запуская вторую Наблюдаемую после завершения первой.

Пример наблюдателя для второго Observable довольно прост. По сути, second теперь действует так, как вы ожидаете, что two будут действовать как в ОП. Более конкретно, second будет выдавать первое и единственное первое значение, испускаемое someOtherObservable (из-за take(1)), а затем завершаться, предполагая, что ошибки нет.

пример

Вот полный рабочий пример, который вы можете скопировать/вставить, если хотите, чтобы мой пример работал в реальной жизни:

var someObservable = Observable.from([1, 2, 3, 4, 5]);
var someOtherObservable = Observable.from([6, 7, 8, 9]);

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
}).subscribe(
  function onNext(value) {
    console.log(value);
  },
  function onError(error) {
    console.error(error);
  },
  function onComplete() {
    console.log("Done!");
  }
);

Если вы смотрите консоль, приведенный выше пример напечатает:

1

6

Готово!

Ответ 7

Вот способ многократного использования (это машинопись, но вы можете адаптировать ее к js):

export function waitFor<T>(signal: Observable<any>) {
    return (source: Observable<T>) =>
        new Observable<T>(observer =>
            signal.pipe(first())
                .subscribe(_ =>
                    source.subscribe(observer)
                )
        );
}

и вы можете использовать его как любой оператор:

var two = someOtherObservable.pipe(waitFor(one), take(1));

Это в основном оператор, который откладывает подписку на наблюдаемый источник, пока наблюдаемый сигнал не испустит первое событие.

Ответ 8

Вы можете использовать результат, полученный от предыдущего Observable благодаря оператору mergeMap (или его псевдоним flatMap), например:

 const one = Observable.of('https://api.github.com/users');
 const two = (c) => ajax(c);//ajax from Rxjs/dom library

 one.mergeMap(two).subscribe(c => console.log(c))

Ответ 9

Вот довольно интуитивная и простая реализация в RxSwift, которая должна быть легко переведена в RxJS:

Observable.never().takeUntil(signalingObservable).concat(delayedObservable)

Ответ 10

ну, я знаю, что это довольно старый, но я думаю, что вам может понадобиться:

var one = someObservable.take(1);

var two = someOtherObservable.pipe(
  concatMap((twoRes) => one.pipe(mapTo(twoRes))),
  take(1)
).subscribe((twoRes) => {
   // one is completed and we get two subscription.
})