Ошибка поймать, если retryWhen: s истекает

В документации для RetryWhen пример выглядит следующим образом:

Observable.create((Subscriber<? super String> s) -> {
  System.out.println("subscribing");
  s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
  return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
      System.out.println("delay retry by " + i + " second(s)");
      return Observable.timer(i, TimeUnit.SECONDS);
  });
}).toBlocking().forEach(System.out::println);

Но как я могу распространять ошибку, если истечет срок повтора?

Добавление .doOnError(System.out::println) после предложения retryWhen не вызывает ошибки. Является ли оно даже испускаемым?

Добавление .doOnError(System.out::println) перед повторной попыткой. Показывает always fails для всех попыток.

Ответы

Ответ 1

Javadoc для retryWhen заявляет, что:

Если этот Observable вызывает onComplete или onError, повторите попытку onCompleted или onError на дочерней подписке.

Проще говоря, если вы хотите распространять исключение, вам нужно будет восстановить исходное исключение, если у вас достаточно повторной попытки.

Простым способом является установка вашего Observable.range на 1 больше, чем количество повторений.

Затем в вашей zip-функции проверьте текущее количество попыток. Если он равен NUMBER_OF_RETRIES + 1, верните Observable.error(throwable) или повторно выбросите исключение.

Е.Г.

Observable.create((Subscriber<? super String> s) -> {
            System.out.println("subscribing");
            s.onError(new RuntimeException("always fails"));
        }).retryWhen(attempts -> {
            return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (throwable, attempt) -> {
                if (attempt == NUMBER_OF_RETRIES + 1) {
                    throw Throwables.propagate(throwable);
                }
                else {
                    return attempt;
                }
            }).flatMap(i -> {
                System.out.println("delaying retry by " + i + " second(s)");
                return Observable.timer(i, TimeUnit.SECONDS);
            });
        }).toBlocking().forEach(System.out::println);

В качестве стороннего doOnError никак не влияет на Observable - он просто предоставляет вам крючок для выполнения какого-либо действия, если возникает ошибка. Общим примером является протоколирование.

Ответ 2

В документе retryWhen говорится, что он передает уведомление onError своим подписчикам и завершает работу. Итак, вы можете сделать что-то вроде этого:

    final int ATTEMPTS = 3;

    Observable.create((Subscriber<? super String> s) -> {
        System.out.println("subscribing");
        s.onError(new RuntimeException("always fails"));
    }).retryWhen(attempts -> attempts
            .zipWith(Observable.range(1, ATTEMPTS), (n, i) ->
                    i < ATTEMPTS ?
                            Observable.timer(i, SECONDS) :
                            Observable.error(n))
            .flatMap(x -> x))
            .toBlocking()
            .forEach(System.out::println);

Ответ 3

Один параметр использует Observable.materialize() для преобразования элементов Observable.range() в уведомления. Затем, когда выдается onCompleted(), можно распространять ошибку ниже по течению (в примере ниже Pair используется для переноса уведомлений и исключений Observable.range() Observable)

   @Test
   public void retryWhen() throws Exception {

    Observable.create((Subscriber<? super String> s) -> {
        System.out.println("subscribing");
        s.onError(new RuntimeException("always fails"));
    }).retryWhen(attempts -> {
        return attempts.zipWith(Observable.range(1, 3).materialize(), Pair::new)
           .flatMap(notifAndEx -> {
            System.out.println("delay retry by " + notifAndEx + " second(s)");
            return notifAndEx.getRight().isOnCompleted()
                    ? Observable.<Integer>error(notifAndEx.getLeft())
                    : Observable.timer(notifAndEx.getRight().getValue(), TimeUnit.SECONDS);
        });
    }).toBlocking().forEach(System.out::println);
}

    private static class Pair<L,R> {
        private final L left;
        private final R right;

        public Pair(L left, R right) {
            this.left = left;
            this.right = right;
        }

        public L getLeft() {
            return left;
        }

        public R getRight() {
            return right;
        }
    }

Ответ 4

Вы можете получить нужное поведение, используя конструктор RetryWhen в rxjava-extras, который находится на Maven Central. Используйте последнюю версию.

Observable.create((Subscriber<? super String> s) -> {
    System.out.println("subscribing");
    s.onError(new RuntimeException("always fails"));
}) 
.retryWhen(RetryWhen
   .delays(Observable.range(1, 3)
               .map(n -> (long) n), 
            TimeUnit.SECONDS).build())
.doOnError(e -> e.printStackTrace()) 
.toBlocking().forEach(System.out::println);

Ответ 5

Вам нужно использовать onErrorResumeNext после retryWhen

В вашем примере

    Observable.create((Subscriber<? super String> s) -> {
        System.out.println("subscribing");
        s.onError(new RuntimeException("always fails"));
    }).retryWhen(attempts -> {
        return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (n, i) -> {
            if (i == NUMBER_OF_RETRIES + 1) {
                throw Throwables.propagate(n);
            }
            else {
                return i;
            }
        }).flatMap(i -> {
            System.out.println("delay retry by " + i + " second(s)");
            return Observable.timer(i, TimeUnit.SECONDS);
        });
    })
    .onErrorResumeNext(t -> {System.out.println("Error after all retries:" + t.getMessage());
                                              return Observable.error(t);
                                          })
    .toBlocking().forEach(System.out::println);

Внизу этого класса вы можете увидеть практический пример, чтобы понять, как это работает. https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java

Ответ 6

Вы можете использовать функцию сканирования, которая возвращает пару с накопленным индексом и решает, следует ли передавать ошибку:

.retryWhen(attempts -> 
    return .scan(Pair.create(0, null), (index, value) -> Pair.create(index.first + 1, value))
            .flatMap(pair -> {
                if(pair.first > MAX_RETRY_COUNT) {
                    throw new RuntimeException(pair.second);
                }
                return Observable.timer(pair.first, TimeUnit.SECONDS);
            });

Или вы можете придерживаться оператора zipWith, но увеличивать число в range Наблюдаемое и возвращать пару вместо одного индекса. Таким образом, вы не потеряете информацию о предыдущем throwable.

attempts
    .zipWith(Observable.range(1, MAX_RETRY_COUNT + 1), (throwable, i) -> Pair.create(i, throwable))
    .flatMap(pair -> {
        if(pair.first > MAX_RETRY_COUNT) throw new RuntimeException(pair.second);
        System.out.println("delay retry by " + pair.first + " second(s)");
        return Observable.timer(pair.first, TimeUnit.SECONDS);
    });