Как игнорировать ошибку и продолжать бесконечный поток?
Я хотел бы знать, как игнорировать исключения и продолжать бесконечный поток (в моем случае поток местоположений)?
Я извлекаю текущую позицию пользователя (используя Android-ReactiveLocation), а затем отправляю их в свой API (используя Retrofit).
В моем случае, когда возникает исключение во время вызова сети (например, тайм-аута), метод onError
вызывается и поток останавливается сам. Как этого избежать?
Активность:
private RestService mRestService;
private Subscription mSubscription;
private LocationRequest mLocationRequest = LocationRequest.create()
.setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
.setInterval(100);
...
private void start() {
mRestService = ...;
ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this);
mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
.buffer(50)
.flatMap(locations -> mRestService.postLocations(locations)) // can throw exception
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
RestService:
public interface RestService {
@POST("/.../")
Observable<Response> postLocations(@Body List<Location> locations);
}
Ответы
Ответ 1
mRestService.postLocations(locations)
испускает один элемент, затем завершается. Если произошла ошибка, то выдается ошибка, которая завершает поток.
Когда вы вызываете этот метод в flatMap
, ошибка продолжается для вашего "основного" потока, а затем ваш поток останавливается.
Что вы можете сделать, это преобразовать вашу ошибку в другой элемент (как описано здесь: fooobar.com/questions/383226/...), но не в вашем основном потоке (как я полагаю, вы уже пробовали), а на mRestService.postLocations(locations)
.
Таким образом, этот вызов выдаст ошибку, которая будет преобразована в элемент/другую наблюдаемую и затем завершена. (без вызова onError
).
С точки зрения потребителя, mRestService.postLocations(locations)
будет испускать один элемент, а затем завершать, как если бы все mRestService.postLocations(locations)
успешно.
mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
.buffer(50)
.flatMap(locations -> mRestService.postLocations(locations).onErrorReturn((e) -> Collections.emptyList()) // can't throw exception
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
Ответ 2
Возможно, вы захотите использовать один из операторов обработки ошибок.
-
onErrorResumeNext( )
- инструктирует Observable испускать последовательность элементов, если встречает ошибку
-
onErrorReturn( )
- инструктирует Observable испускать определенный элемент при обнаружении ошибки
-
onExceptionResumeNext( )
- инструктирует Observable продолжить испускать элементы после того, как он встретит исключение (но не другое разнообразие броски)
-
retry( )
- если источник Observable испускает ошибку, повторно подпишитесь на нее в надежде, что она завершится без ошибок.
-
retryWhen( )
- если источник Observable испускает ошибку, передайте эту ошибку другому наблюдаемому, чтобы определить, следует ли повторно подписываться на источник
Especialy retry
и onExceptionResumeNext
выглядят многообещающими в вашем случае.
Ответ 3
Просто вставляя информацию о ссылке из @MikeN, ответьте, что она потерялась:
import rx.Observable.Operator;
import rx.functions.Action1;
public final class OperatorSuppressError<T> implements Operator<T, T> {
final Action1<Throwable> onError;
public OperatorSuppressError(Action1<Throwable> onError) {
this.onError = onError;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> t1) {
return new Subscriber<T>(t1) {
@Override
public void onNext(T t) {
t1.onNext(t);
}
@Override
public void onError(Throwable e) {
onError.call(e);
}
@Override
public void onCompleted() {
t1.onCompleted();
}
};
}
}
и использовать его близко к наблюдаемому источнику, поскольку другие операторы могут с нетерпением отказаться от подписки до этого.
Observerable.create(connectToUnboundedStream()).lift(new OperatorSuppressError(log()).doOnNext(someStuff()).subscribe();
Обратите внимание, однако, что это подавляет доставку ошибок из источник. Если какой-либо onNext в цепочке после того, как он выдает исключение, это вероятно, источник будет отписано.
Ответ 4
Если вы просто хотите игнорировать ошибку внутри flatMap
, не возвращая элемент, выполните следующее:
flatMap(item ->
restService.getSomething(item).onErrorResumeNext(Observable.empty())
);
Ответ 5
Попробуйте вызвать службу останова в вызове Observable.defer. Таким образом, для каждого вызова у вас будет возможность использовать свой собственный 'onErrorResumeNext', и ошибки не вызовут ваш основной поток.
reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
.buffer(50)
.flatMap(locations ->
Observable.defer(() -> mRestService.postLocations(locations))
.onErrorResumeNext(<SOME_DEFAULT_TO_REACT_TO>)
)
........
Это решение из этого потока → RxJava Observable и Subscriber для пропуска исключений?, но я думаю, что это будет работать и в вашем случае.
Ответ 6
Добавьте решение этой проблемы:
privider
.compose(ignoreErrorsTransformer)
.subscribe()
private final Observable.Transformer<ResultType, ResultType> ignoreErrorsTransformer =
new Observable.Transformer<ResultType, ResultType>() {
@Override
public Observable<ResultType> call(Observable<ResultType> resultTypeObservable) {
return resultTypeObservable
.materialize()
.filter(new Func1<Notification<ResultType>, Boolean>() {
@Override
public Boolean call(Notification<ResultType> resultTypeNotification) {
return !resultTypeNotification.isOnError();
}
})
.dematerialize();
}
};
Ответ 7
Небольшая модификация решения (@MikeN), позволяющая завершить конечные потоки:
import rx.Observable.Operator;
import rx.functions.Action1;
public final class OperatorSuppressError<T> implements Operator<T, T> {
final Action1<Throwable> onError;
public OperatorSuppressError(Action1<Throwable> onError) {
this.onError = onError;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> t1) {
return new Subscriber<T>(t1) {
@Override
public void onNext(T t) {
t1.onNext(t);
}
@Override
public void onError(Throwable e) {
onError.call(e);
//this will allow finite streams to complete
t1.onCompleted();
}
@Override
public void onCompleted() {
t1.onCompleted();
}
};
}
}