RXJS Дождитесь завершения всех наблюдаемых в массиве (или ошибок)
Я помещаю наблюдаемые в массив, как такой...
var tasks$ = [];
tasks$.push(Observable.timer(1000));
tasks$.push(Observable.timer(3000));
tasks$.push(Observable.timer(10000));
Я хочу Observable, который выдает, когда все задачи $ завершены. Имейте в виду, что на практике у задач $ нет известного числа наблюдаемых.
Я пробовал Observable.zip(tasks$).subscribe()
но, похоже, это не работает в случае, если есть только 1 задача, и это наводит меня на мысль, что ZIP требует четного числа элементов для работы Я бы ожидал.
Я пробовал Observable.concat(tasks$).subscribe()
но результатом оператора concat, похоже, является массив наблюдаемых... например, в основном такой же, как ввод. Вы даже не можете позвонить подписаться на это.
В С# это было бы похоже на Task.WhenAll()
. В ES6 обещание будет похоже на Promise.all()
.
Я сталкивался с несколькими вопросами SO, но все они, похоже, имеют дело с ожиданием на известном количестве потоков (например, сопоставляя их).
Ответы
Ответ 1
Если вы хотите создать наблюдаемое, которое испускает, когда все исходные наблюдаемые завершены, вы можете использовать forkJoin
:
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/forkJoin';
import 'rxjs/add/operator/first';
var tasks$ = [];
tasks$.push(Observable.timer(1000).first());
tasks$.push(Observable.timer(3000).first());
tasks$.push(Observable.timer(10000).first());
Observable.forkJoin(...tasks$).subscribe(results => { console.log(results); });
Ответ 2
Вы можете использовать zip
.
Объединяет несколько Observable для создания Observable, значения которого рассчитываются по значениям, по порядку, каждого из его входных Observables.
const obsvA = this._service.getObjA();
const obsvB = this._service.getObjB();
// or with array
// const obsvArray = [obsvA, obsvB];
const zip = Observable.zip(obsvA, obsvB);
// or with array
// const zip = Observable.zip(...obsvArray);
zip.subscribe(
result => console.log(result), // result is an array with the responses [respA, respB]
);
Что нужно учитывать:
Не должно быть четного числа наблюдаемых. zip
визуально
Как здесь сказано,
Оператор zip подпишется на все внутренние наблюдаемые, ожидая, пока каждая из них выдаст значение. Как только это произойдет, все значения с соответствующим индексом будут отправлены. Это будет продолжаться, пока не завершится хотя бы одна внутренняя наблюдаемая.
Когда одна из наблюдаемых выдает ошибку (или даже обе), подписка закрывается (вызывается onComplete
on complete), и с помощью метода onError
вы получаете только первую ошибку. zip.subscribe(
result => console.log(result), // result is an array with the responses [respA, respB]
error => console.log(error), // will return the error message of the first observable that throws error and then finish it
() => console.log ('completed after first error or if first observable finishes)
);
Ответ 3
Для меня это sample было лучшим решением.
const source = Observable.interval(500);
const example = source.sample(Observable.interval(2000));
const subscribe = example.subscribe(val => console.log('sample', val));
Итак, только когда второй (пример) испускает - вы увидите последнее испущенное значение первого (источника).
В моей задаче я жду подтверждения формы и другого события DOM.
Ответ 4
// waits for all Observables no matter of success/fails each of them
// returns array of items
// each item represent even first value of Observable or it error
export function waitAll(args: Observable<any>[]): Observable<any[]> {
const final = new Subject<any[]>();
const flags = new Array(args.length);
const result = new Array(args.length);
let total = args.length;
for (let i = 0; i < args.length; i++) {
flags[i] = false;
args[i].subscribe(
res => {
console.info('waitAll ' + i + ' ok ', res);
if (flags[i] === false) {
flags[i] = true;
result[i] = res;
total--;
if (total < 1) {
final.next(result);
}
}
},
error => {
console.error('waitAll ' + i + ' failed ', error);
if (flags[i] === false) {
flags[i] = true;
result[i] = error;
total--;
if (total < 1) {
final.next(result);
}
}
}
);
}
return final.asObservable();
}
unit тест:
describe('waitAll', () => {
it('should wait for all observables', async () => {
const o1 = new Subject();
const o2 = new Subject();
const o3 = new Subject();
const o = waitAll([o1, o2, o3]);
const res = {arr: []};
o.subscribe(result => res.arr = result, err => res.arr = []);
expect(res.arr).toEqual([]);
o1.next('success1');
expect(res.arr).toEqual([]);
o2.error('failed2')
expect(res.arr).toEqual([]);
o3.next('success3')
expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
o1.next('success1*');
expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
o2.error('failed2*')
expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
o3.next('success3*')
expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
});
});