RxJava: цепные наблюдаемые
Можно ли реализовать что-то вроде следующего цепочки с помощью RxJava:
loginObservable()
.then( (someData) -> {
// returns another Observable<T> with some long operation
return fetchUserDataObservable(someData);
}).then( (userData) -> {
// it should be called when fetching user data completed (with userData of type T)
cacheUserData(userData);
}).then( (userData) -> {
// it should be called after all previous operations completed
displayUserData()
}).doOnError( (error) -> {
//do something
})
Я нашел эту библиотеку очень интересной, но не могу понять, как цепочка запросов, где друг от друга зависит от предыдущих.
Ответы
Ответ 1
Конечно, RxJava поддерживает .map
, который делает это. Из RxJava Wiki:
![map]()
В принципе, это будет:
loginObservable()
.switchMap( someData -> fetchUserDataObservable(someData) )
.map( userData -> cacheUserData(userData) )
.subscribe(new Subscriber<YourResult>() {
@Override
public void onCompleted() {
// observable stream has ended - no more logins possible
}
@Override
public void onError(Throwable e) {
// do something
}
@Override
public void onNext(YourType yourType) {
displayUserData();
}
});
Ответ 2
Это верхний пост, когда наблюдаются цепочки Googling RxJava, поэтому я просто добавлю еще один распространенный случай, когда вы не захотите преобразовывать полученные данные, но добавьте в цепочку другое действие (например, настройку данных для базы данных). Используйте .flatmap()
. Вот пример:
mDataManager
.fetchQuotesFromApi(limit)
.subscribeOn(mSchedulerProvider.io())
.observeOn(mSchedulerProvider.ui())
// OnErrorResumeNext and Observable.error() would propagate the error to
// the next level. So, whatever error occurs here, would get passed to
// onError() on the UI side.
.onErrorResumeNext(Function { Observable.error<List<Quote>>(it) })
.flatMap { t: List<Quote> ->
// Chain observable as such
mDataManager.setQuotesToDb(t).subscribe(
{},
{ e { "setQuotesToDb() error occurred: ${it.localizedMessage}" } },
{ d { "Done server set" } }
)
Observable.just(t)
}
.subscribeBy(
onNext = {},
onError = { mvpView?.showError("No internet connection") },
onComplete = { d { "onComplete(): done with fetching quotes from api" } }
)
Это RxKotlin2, но идея аналогична RxJava & RxJava2:
Краткое объяснение:
- мы пытаемся получить некоторые данные (цитаты в этом примере) из API с
mDataManager.fetchQuotesFromApi()
- Мы подписываем наблюдаемое, чтобы делать что-то в потоке
.io()
и показывать результаты в потоке .ui()
.
onErrorResumeNext()
гарантирует, что любая ошибка, с которой мы столкнемся при получении данных, будет обнаружена в этом методе. Я хочу завершить всю цепочку, когда там есть ошибка, поэтому я возвращаю Observable.error()
.flatmap()
является частью цепочки. Я хочу иметь возможность устанавливать любые данные, которые я получаю из API, в свою базу данных. Я не преобразую данные, полученные с помощью .map()
, я просто делаю что-то еще с этими данными без их преобразования.
- Я подписываюсь на последнюю цепочку наблюдаемых. Если при извлечении данных произошла ошибка (первая наблюдаемая), она будет обработана (в этом случае распространена на подписанный
onError()
) с помощью onErrorResumeNext()
- Я очень хорошо понимаю, что подписываюсь на наблюдаемую БД (внутри
flatmap()
). Любая ошибка, возникающая из-за этой наблюдаемой, НЕ будет распространяться на последние методы subscribeBy()
, поскольку она обрабатывается внутри метода subscribe()
внутри цепочки .flatmap()
.
Код взят из этого проекта, который находится здесь: https://github.com/Obaied/Sohan/blob/master/app/src/main/java/com/obaied/dingerquotes/ui/start/StartPresenter.kt
Ответ 3
попробуйте использовать scan()
Flowable.fromArray(array).scan(...).subscribe(...)