Ошибка поймать, если retryWhen: s истекает
В документации для RetryWhen пример выглядит следующим образом:
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
}).toBlocking().forEach(System.out::println);
Но как я могу распространять ошибку, если истечет срок повтора?
Добавление .doOnError(System.out::println)
после предложения retryWhen
не вызывает ошибки. Является ли оно даже испускаемым?
Добавление .doOnError(System.out::println)
перед повторной попыткой. Показывает always fails
для всех попыток.
Ответы
Ответ 1
Javadoc для retryWhen
заявляет, что:
Если этот Observable вызывает onComplete или onError, повторите попытку onCompleted или onError на дочерней подписке.
Проще говоря, если вы хотите распространять исключение, вам нужно будет восстановить исходное исключение, если у вас достаточно повторной попытки.
Простым способом является установка вашего Observable.range
на 1 больше, чем количество повторений.
Затем в вашей zip-функции проверьте текущее количество попыток. Если он равен NUMBER_OF_RETRIES + 1, верните Observable.error(throwable)
или повторно выбросите исключение.
Е.Г.
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (throwable, attempt) -> {
if (attempt == NUMBER_OF_RETRIES + 1) {
throw Throwables.propagate(throwable);
}
else {
return attempt;
}
}).flatMap(i -> {
System.out.println("delaying retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
}).toBlocking().forEach(System.out::println);
В качестве стороннего doOnError
никак не влияет на Observable - он просто предоставляет вам крючок для выполнения какого-либо действия, если возникает ошибка. Общим примером является протоколирование.
Ответ 2
В документе retryWhen
говорится, что он передает уведомление onError
своим подписчикам и завершает работу. Итак, вы можете сделать что-то вроде этого:
final int ATTEMPTS = 3;
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> attempts
.zipWith(Observable.range(1, ATTEMPTS), (n, i) ->
i < ATTEMPTS ?
Observable.timer(i, SECONDS) :
Observable.error(n))
.flatMap(x -> x))
.toBlocking()
.forEach(System.out::println);
Ответ 3
Один параметр использует Observable.materialize()
для преобразования элементов Observable.range()
в уведомления. Затем, когда выдается onCompleted()
, можно распространять ошибку ниже по течению (в примере ниже Pair
используется для переноса уведомлений и исключений Observable.range()
Observable
)
@Test
public void retryWhen() throws Exception {
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3).materialize(), Pair::new)
.flatMap(notifAndEx -> {
System.out.println("delay retry by " + notifAndEx + " second(s)");
return notifAndEx.getRight().isOnCompleted()
? Observable.<Integer>error(notifAndEx.getLeft())
: Observable.timer(notifAndEx.getRight().getValue(), TimeUnit.SECONDS);
});
}).toBlocking().forEach(System.out::println);
}
private static class Pair<L,R> {
private final L left;
private final R right;
public Pair(L left, R right) {
this.left = left;
this.right = right;
}
public L getLeft() {
return left;
}
public R getRight() {
return right;
}
}
Ответ 4
Вы можете получить нужное поведение, используя конструктор RetryWhen
в rxjava-extras, который находится на Maven Central. Используйте последнюю версию.
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
})
.retryWhen(RetryWhen
.delays(Observable.range(1, 3)
.map(n -> (long) n),
TimeUnit.SECONDS).build())
.doOnError(e -> e.printStackTrace())
.toBlocking().forEach(System.out::println);
Ответ 5
Вам нужно использовать onErrorResumeNext после retryWhen
В вашем примере
Observable.create((Subscriber<? super String> s) -> {
System.out.println("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (n, i) -> {
if (i == NUMBER_OF_RETRIES + 1) {
throw Throwables.propagate(n);
}
else {
return i;
}
}).flatMap(i -> {
System.out.println("delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
})
.onErrorResumeNext(t -> {System.out.println("Error after all retries:" + t.getMessage());
return Observable.error(t);
})
.toBlocking().forEach(System.out::println);
Внизу этого класса вы можете увидеть практический пример, чтобы понять, как это работает. https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java
Ответ 6
Вы можете использовать функцию сканирования, которая возвращает пару с накопленным индексом и решает, следует ли передавать ошибку:
.retryWhen(attempts ->
return .scan(Pair.create(0, null), (index, value) -> Pair.create(index.first + 1, value))
.flatMap(pair -> {
if(pair.first > MAX_RETRY_COUNT) {
throw new RuntimeException(pair.second);
}
return Observable.timer(pair.first, TimeUnit.SECONDS);
});
Или вы можете придерживаться оператора zipWith
, но увеличивать число в range
Наблюдаемое и возвращать пару вместо одного индекса. Таким образом, вы не потеряете информацию о предыдущем throwable
.
attempts
.zipWith(Observable.range(1, MAX_RETRY_COUNT + 1), (throwable, i) -> Pair.create(i, throwable))
.flatMap(pair -> {
if(pair.first > MAX_RETRY_COUNT) throw new RuntimeException(pair.second);
System.out.println("delay retry by " + pair.first + " second(s)");
return Observable.timer(pair.first, TimeUnit.SECONDS);
});