Ответ 1
ТЛ; др; mergeMap
намного эффективнее map
. Понимание mergeMap
является необходимым условием для доступа к полной мощности Rx.
сходство
-
как
mergeMap
и действияmap
в одном потоке (противzip
,combineLatest
) -
и
mergeMap
иmap
могут преобразовывать элементы потока (противfilter
,delay
)
различия
карта
-
не может изменить размер исходного потока (предположение: сама
map
неthrow
); для каждого элемента из источника испускается только одинmapped
элемент;map
не может игнорировать элементы (например,filter
); -
в случае планировщика по умолчанию преобразование происходит синхронно; чтобы быть на 100% понятным: исходный поток может передавать свои элементы асинхронно, но каждый следующий элемент сразу
mapped
и повторно испускается;map
не может сдвигать элементы во времени, например,delay
-
нет ограничений на возвращаемые значения
-
id
:x => x
mergeMap
-
может изменить размер потока источника; для каждого элемента может быть произвольное число (0, 1 или много) новых элементов, созданных/испущенных
-
он обеспечивает полный контроль над асинхронностью - как при создании/исходе новых элементов, так и о том, сколько элементов из потока источника должно обрабатываться одновременно; например, предположим, что исходный поток испускает 10 элементов, но
maxConcurrency
установлен на 2, тогда два первых элемента будут обрабатываться немедленно, а остальные 8 буферизуются; как только один из обработанногоcomplete
d будет обработан следующий элемент из потока источника и так далее - это немного сложно, но посмотрите на пример ниже -
все остальные операторы могут быть реализованы с помощью только конструктора
mergeMap
иObservable
-
может использоваться для рекурсивных асинхронных операций
-
возвращаемые значения должны быть типа Observable (или Rx должен знать, как создавать наблюдаемые из него - например, обещание, массив)
-
id
:x => Rx.Observable.of(x)
массивная аналогия
let array = [1,2,3]
fn map mergeMap
x => x*x [1,4,9] error /*expects array as return value*/
x => [x,x*x] [[1,1],[2,4],[3,9]] [1,1,2,4,3,9]
Аналогия не показывает полную картину, и она в основном соответствует .mergeMap
с maxConcurrency
установленным в 1. В таком случае элементы будут упорядочены, как указано выше, но в общем случае это не обязательно так. Единственная гарантия, которую мы имеем, заключается в том, что эмиссия новых элементов будет упорядочена по их позиции в базовом потоке. Например: [3,1,2,4,9,1]
и [2,3,1,1,9,4]
действительны, но [1,1,4,2,3,9]
не является ( поскольку 4
был испущен после 2
в нижележащем потоке).
Несколько примеров использования mergeMap
:
// implement .map with .mergeMap
Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {
return this.mergeMap(x => Rx.Observable.of(mapFn(x)));
}
Rx.Observable.range(1, 3)
.mapWithMergeMap(x => x * x)
.subscribe(x => console.log('mapWithMergeMap', x))
// implement .filter with .mergeMap
Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {
return this.mergeMap(x =>
filterFn(x) ?
Rx.Observable.of(x) :
Rx.Observable.empty()); // return no element
}
Rx.Observable.range(1, 3)
.filterWithMergeMap(x => x === 3)
.subscribe(x => console.log('filterWithMergeMap', x))
// implement .delay with .mergeMap
Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {
return this.mergeMap(x =>
Rx.Observable.create(obs => {
// setTimeout is naive - one should use scheduler instead
const token = setTimeout(() => {
obs.next(x);
obs.complete();
}, delayMs)
return () => clearTimeout(token);
}))
}
Rx.Observable.range(1, 3)
.delayWithMergeMap(500)
.take(2)
.subscribe(x => console.log('delayWithMergeMap', x))
// recursive count
const count = (from, to, interval) => {
if (from > to) return Rx.Observable.empty();
return Rx.Observable.timer(interval)
.mergeMap(() =>
count(from + 1, to, interval)
.startWith(from))
}
count(1, 3, 1000).subscribe(x => console.log('count', x))
// just an example of bit different implementation with no returns
const countMoreRxWay = (from, to, interval) =>
Rx.Observable.if(
() => from > to,
Rx.Observable.empty(),
Rx.Observable.timer(interval)
.mergeMap(() => countMoreRxWay(from + 1, to, interval)
.startWith(from)))
const maxConcurrencyExample = () =>
Rx.Observable.range(1,7)
.do(x => console.log('emitted', x))
.mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)
.do(x => console.log('processed', x))
.subscribe()
setTimeout(maxConcurrencyExample, 3100)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>