Ошибка с Flowable onErrorResumeNext, networkOnMainThread
У меня есть следующая цепочка rxJava:
override fun combineLocationToPlace(req: Flowable<Place>): Flowable<Place> {
var combinedFlowable = Flowable
.combineLatest(
req,
getLastLocation().lastOrError().toFlowable(),
BiFunction<Place, Location, Place> { t1, location ->
Timber.w("FIRSTINIT - Retrieved location $location")
var placeLocation = Location(t1.placeName)
placeLocation.latitude = t1.latitude
placeLocation.longitude = t1.longitude
t1.distance = location.distanceTo(placeLocation)
t1
})
return combinedFlowable
.onErrorResumeNext { t: Throwable ->
Timber.w(t, "FIRSTINIT - Could not retrieve location for place (${t.message}) returning original request")
req
}
.doOnError {
Timber.w("FIRSTINIT - did detect the error here...")
}
return combinedFlowable
}
Короче говоря, я извлекаю некоторые данные из локальной базы данных (место), и я хочу объединить его с последним местоположением из GPS:
override fun getLastLocation(requestIfEmpty: Boolean): Observable<Location> {
var lastLocation = locationProvider.lastKnownLocation
.doOnNext {
Timber.w("Got location $it from last one")
}
.doOnComplete {
Timber.w("did i get a location?")
}
if (requestIfEmpty) {
Timber.w("Switching to request of location")
lastLocation = lastLocation.switchIfEmpty(requestLocation())
}
return lastLocation.doOnNext {
Timber.w("Got something!")
location = it
}
}
Но я хочу учитывать scneario, где у пользователя нет последнего местоположения, и, следовательно, строка:
return combinedFlowable
.onErrorResumeNext { t: Throwable ->
Timber.w(t, "FIRSTINIT - Could not retrieve location for place (${t.message}) returning original request")
req
}
.doOnError {
Timber.w("FIRSTINIT - did detect the error here...")
}
Который пытается поймать любую ошибку и повторить попытку только с первоначальным запросом, не объединяя его ни с чем. Я вызываю этот код следующим образом:
fun getPlace(placeId: String) {
locationManager.combineLocationToPlace(placesRepository.getPlace(placeId))
.onErrorResumeNext { t: Throwable ->
Timber.e(t, "Error resuming next! ")
placesRepository.getPlace(placeId)
}.subscribeOn(schedulerProvider.io()).observeOn(schedulerProvider.ui())
.subscribeBy(
onNext = {
place.value = Result.success(it)
},
onError = {
Timber.e("ERROR! $it")
place.value = Result.failure(it)
}
)
.addTo(disposables)
}
Однако, когда нет места, NoSuchElementException
, мои текущие переключаются на исходный запрос, а затем после его выполнения я получаю исключение NetworkOnMainThread
. Разве этот запрос не должен выполняться в scheduler.io()
который я вставлял туда (с тех пор, как я поставил код перед этим)?
Если вам интересно, schedulerProvider.io()
переводит на:
Schedulers.io()
GetPlace:
/**
* Retrieves a single place from database
*/
override fun getPlace(id: String): Flowable<Place> {
return Flowable.merge(placesDao.getPlace(id),
refreshPlace(id).toFlowable())
}
/**
* Triggers a refreshPlace update on the db, useful when changing stuff associated with the place
* itself indirectly (e.g. an experience)
*/
private fun refreshPlace(id: String): Single<Place> {
return from(placesApi.getPlace(id))
.doOnSuccess {
placesDao.savePlace(it)
}
}
Ответы
Ответ 1
Чтобы убедиться, что ваша сеть отключена от основного потока, отправьте его в фоновый поток явно.
Используйте планировщики ввода-вывода, новой темы или вычислений из класса Rx Schedulers:
subscribeOn(Schedulers.computation())
Для случаев, когда вы не хотите этого делать (или, если вы считаете, что он уже должен быть в фоновом потоке и просто хотите отлаживать), вы можете записывать информацию о потоке следующим образом:
Thread.currentThread().getName()
Это может быть особенно полезно для отслеживания того, что происходит, когда вы используете либо Schedulers.trampoline()
если у вас есть разные планировщики для наблюдения и подписки, как в вашем примере:
.subscribeOn(schedulerProvider.io()).observeOn(schedulerProvider.ui())
Ответ 2
Вы не можете использовать onErrorResumeNext для реализации этой функциональности. Вы должны использовать retryWhen оператора.
Возможно, этот пост полезен для вас.
https://medium.com/@v.danylo/server-polling-and-retrying-failed-operations-with-retrofit-and-rxjava-8bcc7e641a5a
этот код опроса сервера с отсрочкой до получения кода отличается от 204. Может быть, вы можете адаптироваться к вашим потребностям, используя retryWhen вместо repeatWhen.
fun pollServerWithBackoff(videoId: String, maxAttempts: Int, delay: Int): Flowable<Response<ResponseBody>> {
return api.download(videoId)
.subscribeOn(Schedulers.io())
.repeatWhen {
it
.zipWith(Flowable.range(1, maxAttempts),
BiFunction { _: Any?, attempt: Int -> attempt })
.flatMap {
Flowable.timer((it * delay).toLong(), TimeUnit.SECONDS);
}
}
.takeUntil({
it.code() != 204
})
.filter {
it.code() != 204
}
.map{
if(it.code() in 200..300)
it
else
throw IOException(it.errorBody()?.toString() ?: "Unkown Error")
}
}