Rxjava: Можно ли использовать функцию retry(), но с задержкой?
Я использую rxjava в своем Android-приложении для асинхронной обработки сетевых запросов. Теперь я хотел бы повторить неудачный сетевой запрос только через некоторое время.
Можно ли использовать функцию retry() в Observable, но повторить только после некоторой задержки?
Есть ли способ сообщить знанию Observable, что в настоящее время повторно (в отличие от попыток в первый раз)?
Я посмотрел на debounce()/throttleWithTimeout(), но они, похоже, делают что-то другое.
Edit:
Я думаю, что нашел один способ сделать это, но мне было бы интересно подтвердить, что это правильный способ сделать это или для других, лучших способов.
Что я делаю, так это: В методе call() моего Observable.OnSubscribe, прежде чем я вызову метод Subscribers onError(), я просто разрешаю Thread sleep в течение необходимого количества времени. Итак, чтобы повторить каждые 1000 миллисекунд, я делаю что-то вроде этого:
@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
try {
Log.d(TAG, "trying to load all products with pid: " + pid);
subscriber.onNext(productClient.getProductNodesForParentId(pid));
subscriber.onCompleted();
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e.printStackTrace();
}
subscriber.onError(e);
}
}
Так как этот метод работает на потоке ввода-вывода, он не блокирует пользовательский интерфейс. Единственная проблема, которую я вижу, заключается в том, что даже первая ошибка сообщается с задержкой, поэтому задержка существует, даже если нет retry(). Я бы хотел, чтобы это было лучше, если задержка не была применена после ошибки, но вместо этого перед повторением (но не до первой попытки, очевидно).
Ответы
Ответ 1
Вы можете использовать оператор retryWhen()
, чтобы добавить логику повтора к любому наблюдаемому.
Следующий класс содержит логику повтора:
RxJava 2.x
public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
private final int maxRetries;
private final int retryDelayMillis;
private int retryCount;
public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
this.maxRetries = maxRetries;
this.retryDelayMillis = retryDelayMillis;
this.retryCount = 0;
}
@Override
public Observable<?> apply(final Observable<? extends Throwable> attempts) {
return attempts
.flatMap(new Function<Throwable, Observable<?>>() {
@Override
public Observable<?> apply(final Throwable throwable) {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
return Observable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}
// Max retries hit. Just pass the error along.
return Observable.error(throwable);
}
});
}
}
RxJava 1.x
public class RetryWithDelay implements
Func1<Observable<? extends Throwable>, Observable<?>> {
private final int maxRetries;
private final int retryDelayMillis;
private int retryCount;
public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
this.maxRetries = maxRetries;
this.retryDelayMillis = retryDelayMillis;
this.retryCount = 0;
}
@Override
public Observable<?> call(Observable<? extends Throwable> attempts) {
return attempts
.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
return Observable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}
// Max retries hit. Just pass the error along.
return Observable.error(throwable);
}
});
}
}
Использование:
// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
.retryWhen(new RetryWithDelay(3, 2000));
Ответ 2
Вдохновленный ответом Пола, и если вас не волнуют retryWhen
проблемы, указанные Абхиджитом Саркаром, самый простой способ отложить повторную подписку с помощью rxJava2 безоговорочно:
source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))
Возможно, вы захотите увидеть больше примеров и объяснений при повторных попытках и повторении при.
Ответ 3
Это решение, основанное на фрагментах Бена Кристенсена, которые я видел, RetryWhen Example и RetryWhenTestsConditional (I пришлось изменить n.getThrowable()
на n
, чтобы он работал). Я использовал evant/gradle-retrolambda, чтобы сделать лямбда-нотацию на Android, но вам не нужно использовать лямбда (хотя это очень рекомендуется). Для задержки я реализовал экспоненциальный откат, но вы можете подключить любую логику возврата, которую вы хотите там. Для полноты я добавил операторы subscribeOn
и observeOn
. Я использую ReactiveX/RxAndroid для AndroidSchedulers.mainThread()
.
int ATTEMPT_COUNT = 10;
public class Tuple<X, Y> {
public final X x;
public final Y y;
public Tuple(X x, Y y) {
this.x = x;
this.y = y;
}
}
observable
.subscribeOn(Schedulers.io())
.retryWhen(
attempts -> {
return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i))
.flatMap(
ni -> {
if (ni.y > ATTEMPT_COUNT)
return Observable.error(ni.x);
return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS);
});
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
Ответ 4
вместо использования MyRequestObservable.retry Я использую функцию-оболочку retryObservable (MyRequestObservable, retrycount, seconds), которые возвращают новый Observable, которые обрабатывают косвенную связь для задержки, поэтому я могу сделать
retryObservable(restApi.getObservableStuff(), 3, 30)
.subscribe(new Action1<BonusIndividualList>(){
@Override
public void call(BonusIndividualList arg0)
{
//success!
}
},
new Action1<Throwable>(){
@Override
public void call(Throwable arg0) {
// failed after the 3 retries !
}});
// wrapper code
private static <T> Observable<T> retryObservable(
final Observable<T> requestObservable, final int nbRetry,
final long seconds) {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> subscriber) {
requestObservable.subscribe(new Action1<T>() {
@Override
public void call(T arg0) {
subscriber.onNext(arg0);
subscriber.onCompleted();
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable error) {
if (nbRetry > 0) {
Observable.just(requestObservable)
.delay(seconds, TimeUnit.SECONDS)
.observeOn(mainThread())
.subscribe(new Action1<Observable<T>>(){
@Override
public void call(Observable<T> observable){
retryObservable(observable,
nbRetry - 1, seconds)
.subscribe(subscriber);
}
});
} else {
// still fail after retries
subscriber.onError(error);
}
}
});
}
});
}
Ответ 5
Этот пример работает с jxjava 2.2.2:
Повторите попытку без задержки:
Single.just(somePaylodData)
.map(data -> someConnection.send(data))
.retry(5)
.doOnSuccess(status -> log.info("Yay! {}", status);
Повторите попытку с задержкой:
Single.just(somePaylodData)
.map(data -> someConnection.send(data))
.retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS))
.doOnSuccess(status -> log.info("Yay! {}", status)
.doOnError((Throwable error)
-> log.error("I tried five times with a 300ms break"
+ " delay in between. But it was in vain."));
Наш исходный сингл терпит неудачу, если не работает someConnection.send(). Когда это происходит, наблюдаемые сбои внутри retryWhen выдают ошибку. Мы задерживаем это излучение на 300 мс и отправляем его обратно, чтобы сообщить о повторной попытке. take (5) гарантирует, что наша наблюдаемая сигнализация прекратится после того, как мы получим пять ошибок. retryWhen видит завершение и не повторяет попытку после пятого сбоя.
Ответ 6
retryWhen
является сложным, возможно, даже багги-оператором. В официальном документе и, по крайней мере, в одном ответе здесь используется оператор range
, который не будет выполнен, если повторных попыток не будет. См. Мой обсуждение с членом ReactiveX Дэвидом Карноком.
Я улучшил ответ на kjones, изменив flatMap
на concatMap
и добавив класс RetryDelayStrategy
. flatMap
не сохраняет порядок излучения, а concatMap
, что важно для задержек с отклонением. RetryDelayStrategy
, как указывает имя, позволяет пользователю выбирать из различных режимов создания задержек повторных попыток, включая откат.
Код доступен на моем GitHub в комплекте со следующими тестовыми примерами:
- Превышает 1-ю попытку (нет попыток)
- Сбой после 1 попытки повтора
- Попытка повторить попытку 3 раза, но с успехом на 2-й, следовательно, не повторяет третий раз
- Преуспевает в 3-й попытке
См. метод setRandomJokes
.
Ответ 7
Теперь, используя RxJava версии 1.0+, вы можете использовать zipWith для достижения повторной попытки с задержкой.
Добавление изменений в kjones.
Modified
public class RetryWithDelay implements
Func1<Observable<? extends Throwable>, Observable<?>> {
private final int MAX_RETRIES;
private final int DELAY_DURATION;
private final int START_RETRY;
/**
* Provide number of retries and seconds to be delayed between retry.
*
* @param maxRetries Number of retries.
* @param delayDurationInSeconds Seconds to be delays in each retry.
*/
public RetryWithDelay(int maxRetries, int delayDurationInSeconds) {
MAX_RETRIES = maxRetries;
DELAY_DURATION = delayDurationInSeconds;
START_RETRY = 1;
}
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable
.delay(DELAY_DURATION, TimeUnit.SECONDS)
.zipWith(Observable.range(START_RETRY, MAX_RETRIES),
new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer attempt) {
return attempt;
}
});
}
}
Ответ 8
Тот же ответ, что и у kjones, но обновлен до последней версии Для версии RxJava 2.x: ('io.reactivex.rxjava2: rxjava: 2.1.3')
public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> {
private final int maxRetries;
private final long retryDelayMillis;
private int retryCount;
public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
this.maxRetries = maxRetries;
this.retryDelayMillis = retryDelayMillis;
this.retryCount = 0;
}
@Override
public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
@Override
public Publisher<?> apply(Throwable throwable) throws Exception {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
return Flowable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}
// Max retries hit. Just pass the error along.
return Flowable.error(throwable);
}
});
}
}
Использование:
//Добавить логику повторения к существующей наблюдаемой. // Повторить максимум 3 раза с задержкой в 2 секунды.
observable
.retryWhen(new RetryWithDelay(3, 2000));
Ответ 9
Вы можете добавить задержку в Observable, возвращенную в retryWhen Operator
/**
* Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
*/
@Test
public void observableOnErrorResumeNext() {
Subscription subscription = Observable.just(null)
.map(Object::toString)
.doOnError(failure -> System.out.println("Error:" + failure.getCause()))
.retryWhen(errors -> errors.doOnNext(o -> count++)
.flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
Schedulers.newThread())
.onErrorResumeNext(t -> {
System.out.println("Error after all retries:" + t.getCause());
return Observable.just("I save the world for extinction!");
})
.subscribe(s -> System.out.println(s));
new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}
Здесь вы можете увидеть больше примеров. https://github.com/politrons/reactive
Ответ 10
Для версии Kotlin & RxJava1
class RetryWithDelay(private val MAX_RETRIES: Int, private val DELAY_DURATION_IN_SECONDS: Long)
: Function1<Observable<out Throwable>, Observable<*>> {
private val START_RETRY: Int = 1
override fun invoke(observable: Observable<out Throwable>): Observable<*> {
return observable.delay(DELAY_DURATION_IN_SECONDS, TimeUnit.SECONDS)
.zipWith(Observable.range(START_RETRY, MAX_RETRIES),
object : Function2<Throwable, Int, Int> {
override fun invoke(throwable: Throwable, attempt: Int): Int {
return attempt
}
})
}
}
Ответ 11
(Kotlin) Я немного улучшил код с экспоненциальным откатом и применил защитное излучение Observable.range():
fun testOnRetryWithDelayExponentialBackoff() {
val interval = 1
val maxCount = 3
val ai = AtomicInteger(1);
val source = Observable.create<Unit> { emitter ->
val attempt = ai.getAndIncrement()
println("Subscribe ${attempt}")
if (attempt >= maxCount) {
emitter.onNext(Unit)
emitter.onComplete()
}
emitter.onError(RuntimeException("Test $attempt"))
}
// Below implementation of "retryWhen" function, remove all "println()" for real code.
val sourceWithRetry: Observable<Unit> = source.retryWhen { throwableRx ->
throwableRx.doOnNext({ println("Error: $it") })
.zipWith(Observable.range(1, maxCount)
.concatMap { Observable.just(it).delay(0, TimeUnit.MILLISECONDS) },
BiFunction { t1: Throwable, t2: Int -> t1 to t2 }
)
.flatMap { pair ->
if (pair.second >= maxCount) {
Observable.error(pair.first)
} else {
val delay = interval * 2F.pow(pair.second)
println("retry delay: $delay")
Observable.timer(delay.toLong(), TimeUnit.SECONDS)
}
}
}
//Code to print the result in terminal.
sourceWithRetry
.doOnComplete { println("Complete") }
.doOnError({ println("Final Error: $it") })
.blockingForEach { println("$it") }
}
Ответ 12
в случае, когда вам нужно распечатать счет повторов, вы можете использовать пример, представленный на вики-странице Rxjava https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators
observable.retryWhen(errors ->
// Count and increment the number of errors.
errors.map(error -> 1).scan((i, j) -> i + j)
.doOnNext(errorCount -> System.out.println(" -> query errors #: " + errorCount))
// Limit the maximum number of retries.
.takeWhile(errorCount -> errorCount < retryCounts)
// Signal resubscribe event after some delay.
.flatMapSingle(errorCount -> Single.timer(errorCount, TimeUnit.SECONDS));
Ответ 13
Основываясь на ответе kjones, здесь приведена версия RxJava 2.x Kotlin с повтором с задержкой в качестве расширения. Замените Observable
, чтобы создать такое же расширение для Flowable
.
fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> {
var retryCount = 0
return retryWhen { thObservable ->
thObservable.flatMap { throwable ->
if (++retryCount < maxRetries) {
Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
} else {
Observable.error(throwable)
}
}
}
}
Тогда просто используйте его на наблюдаемом observable.retryWithDelay(3, 1000)
Ответ 14
Просто сделайте следующее:
Observable.just("")
.delay(2, TimeUnit.SECONDS) //delay
.flatMap(new Func1<String, Observable<File>>() {
@Override
public Observable<File> call(String s) {
L.from(TAG).d("postAvatar=");
File file = PhotoPickUtil.getTempFile();
if (file.length() <= 0) {
throw new NullPointerException();
}
return Observable.just(file);
}
})
.retry(6)
.subscribe(new Action1<File>() {
@Override
public void call(File file) {
postAvatar(file);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
});