RxJava2 наблюдаемый прием бросает UndeliverableException

Как я понимаю, RxJava2 values.take(1) создает другое Observable, которое содержит только один элемент из исходного Observable. Какой НЕ ДОЛЖЕН выдавать исключение, поскольку он отфильтровывается эффектом take(1), как это произошло во втором.

как в следующем фрагменте кода

    Observable<Integer> values = Observable.create(o -> {
        o.onNext(1);
        o.onError(new Exception("Oops"));
    });

    values.take(1)
            .subscribe(
                    System.out::println,
                    e -> System.out.println("Error: " + e.getMessage()),
                    () -> System.out.println("Completed")
            );

Выход

1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more

Мои вопросы:

  • Я понимаю, что это правильно?
  • Что действительно происходит, чтобы вызвать исключение.
  • Как это решить от потребителя?

Ответы

Ответ 1

  • Да, но поскольку наблюдаемые "концы" не означают, что код, запущенный внутри create(...), останавливается. Чтобы быть в полной безопасности, в этом случае вам нужно использовать o.isDisposed(), чтобы увидеть, завершился ли наблюдаемый downstream.
  • Исключение есть потому, что RxJava 2 имеет политику NEVER, позволяющую потерять вызов onError. Он либо передается вниз по потоку, либо генерируется как глобальный UndeliverableException, если наблюдаемое уже завершено. Разработчик Observable должен "правильно" обрабатывать случай, когда наблюдаемое завершено, и возникает Исключение.
  • Проблема заключается в том, что производитель (Observable) и потребитель (Subscriber) не соглашаются, когда поток заканчивается. Поскольку производитель в этом случае переживает потребителя, проблема может быть устранена только у производителя.

Ответ 2

@Kiskae в предыдущем комментарии правильно ответил о причине возникновения такого исключения.

Вот ссылка на официальный документ по этой теме: RxJava2-wiki.

Иногда вы не можете изменить это поведение, поэтому есть способ обработки этого UndeliverableException. Вот фрагмент кода о том, как избежать сбоев и неправильного поведения:

RxJavaPlugins.setErrorHandler(e -> {
    if (e instanceof UndeliverableException) {
        e = e.getCause();
    }
    if ((e instanceof IOException) || (e instanceof SocketException)) {
        // fine, irrelevant network problem or API that throws on cancellation
        return;
    }
    if (e instanceof InterruptedException) {
        // fine, some blocking code was interrupted by a dispose call
        return;
    }
    if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
        // that likely a bug in the application
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    if (e instanceof IllegalStateException) {
        // that a bug in RxJava or in a custom operator
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    Log.warning("Undeliverable exception received, not sure what to do", e);
});

Этот код взят по ссылке выше.

Важная заметка. Этот подход устанавливает глобальный обработчик ошибок в RxJava, поэтому, если вы можете избавиться от этих исключений - это был бы лучший вариант.