RxJava2 наблюдаемый прием бросает UndeliverableException
Как я понимаю, RxJava2 values.take(1)
создает другое Observable, которое содержит только один элемент из исходного Observable. Какой НЕ ДОЛЖЕН выдавать исключение, поскольку он отфильтровывается эффектом take(1)
, как это произошло во втором.
как в следующем фрагменте кода
Observable<Integer> values = Observable.create(o -> {
o.onNext(1);
o.onError(new Exception("Oops"));
});
values.take(1)
.subscribe(
System.out::println,
e -> System.out.println("Error: " + e.getMessage()),
() -> System.out.println("Completed")
);
Выход
1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
at ch02.lambda$main$0(ch02.java:28)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.Observable.subscribe(Observable.java:10827)
at io.reactivex.Observable.subscribe(Observable.java:10787)
at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
at ch02.lambda$main$0(ch02.java:28)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.Observable.subscribe(Observable.java:10827)
at io.reactivex.Observable.subscribe(Observable.java:10787)
at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
... 8 more
Мои вопросы:
- Я понимаю, что это правильно?
- Что действительно происходит, чтобы вызвать исключение.
- Как это решить от потребителя?
Ответы
Ответ 1
- Да, но поскольку наблюдаемые "концы" не означают, что код, запущенный внутри
create(...)
, останавливается. Чтобы быть в полной безопасности, в этом случае вам нужно использовать o.isDisposed()
, чтобы увидеть, завершился ли наблюдаемый downstream.
- Исключение есть потому, что RxJava 2 имеет политику NEVER, позволяющую потерять вызов
onError
. Он либо передается вниз по потоку, либо генерируется как глобальный UndeliverableException
, если наблюдаемое уже завершено. Разработчик Observable должен "правильно" обрабатывать случай, когда наблюдаемое завершено, и возникает Исключение.
- Проблема заключается в том, что производитель (
Observable
) и потребитель (Subscriber
) не соглашаются, когда поток заканчивается. Поскольку производитель в этом случае переживает потребителя, проблема может быть устранена только у производителя.
Ответ 2
@Kiskae в предыдущем комментарии правильно ответил о причине возникновения такого исключения.
Вот ссылка на официальный документ по этой теме: RxJava2-wiki.
Иногда вы не можете изменить это поведение, поэтому есть способ обработки этого UndeliverableException
. Вот фрагмент кода о том, как избежать сбоев и неправильного поведения:
RxJavaPlugins.setErrorHandler(e -> {
if (e instanceof UndeliverableException) {
e = e.getCause();
}
if ((e instanceof IOException) || (e instanceof SocketException)) {
// fine, irrelevant network problem or API that throws on cancellation
return;
}
if (e instanceof InterruptedException) {
// fine, some blocking code was interrupted by a dispose call
return;
}
if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
// that likely a bug in the application
Thread.currentThread().getUncaughtExceptionHandler()
.handleException(Thread.currentThread(), e);
return;
}
if (e instanceof IllegalStateException) {
// that a bug in RxJava or in a custom operator
Thread.currentThread().getUncaughtExceptionHandler()
.handleException(Thread.currentThread(), e);
return;
}
Log.warning("Undeliverable exception received, not sure what to do", e);
});
Этот код взят по ссылке выше.
Важная заметка. Этот подход устанавливает глобальный обработчик ошибок в RxJava, поэтому, если вы можете избавиться от этих исключений - это был бы лучший вариант.