Как игнорировать ошибку и продолжать бесконечный поток?

Я хотел бы знать, как игнорировать исключения и продолжать бесконечный поток (в моем случае поток местоположений)?

Я извлекаю текущую позицию пользователя (используя Android-ReactiveLocation), а затем отправляю их в свой API (используя Retrofit).

В моем случае, когда возникает исключение во время вызова сети (например, тайм-аута), метод onError вызывается и поток останавливается сам. Как этого избежать?

Активность:

private RestService mRestService;
private Subscription mSubscription;
private LocationRequest mLocationRequest = LocationRequest.create()
            .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
            .setInterval(100);
...
private void start() {
    mRestService = ...;
    ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this);
    mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
            .buffer(50)
            .flatMap(locations -> mRestService.postLocations(locations)) // can throw exception
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe();
}

RestService:

public interface RestService {
    @POST("/.../")
    Observable<Response> postLocations(@Body List<Location> locations);
}

Ответы

Ответ 1

mRestService.postLocations(locations) испускает один элемент, затем завершается. Если произошла ошибка, то выдается ошибка, которая завершает поток.

Когда вы вызываете этот метод в flatMap, ошибка продолжается для вашего "основного" потока, а затем ваш поток останавливается.

Что вы можете сделать, это преобразовать вашу ошибку в другой элемент (как описано здесь: fooobar.com/questions/383226/...), но не в вашем основном потоке (как я полагаю, вы уже пробовали), а на mRestService.postLocations(locations).

Таким образом, этот вызов выдаст ошибку, которая будет преобразована в элемент/другую наблюдаемую и затем завершена. (без вызова onError).

С точки зрения потребителя, mRestService.postLocations(locations) будет испускать один элемент, а затем завершать, как если бы все mRestService.postLocations(locations) успешно.

mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
        .buffer(50)
        .flatMap(locations -> mRestService.postLocations(locations).onErrorReturn((e) -> Collections.emptyList()) // can't throw exception
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe();

Ответ 2

Возможно, вы захотите использовать один из операторов обработки ошибок.

  • onErrorResumeNext( ) - инструктирует Observable испускать последовательность элементов, если встречает ошибку
  • onErrorReturn( ) - инструктирует Observable испускать определенный элемент при обнаружении ошибки
  • onExceptionResumeNext( ) - инструктирует Observable продолжить испускать элементы после того, как он встретит исключение (но не другое разнообразие броски)
  • retry( ) - если источник Observable испускает ошибку, повторно подпишитесь на нее в надежде, что она завершится без ошибок.
  • retryWhen( ) - если источник Observable испускает ошибку, передайте эту ошибку другому наблюдаемому, чтобы определить, следует ли повторно подписываться на источник

Especialy retry и onExceptionResumeNext выглядят многообещающими в вашем случае.

Ответ 3

Просто вставляя информацию о ссылке из @MikeN, ответьте, что она потерялась:

import rx.Observable.Operator;
import rx.functions.Action1;

public final class OperatorSuppressError<T> implements Operator<T, T> {
    final Action1<Throwable> onError;

    public OperatorSuppressError(Action1<Throwable> onError) {
        this.onError = onError;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> t1) {
        return new Subscriber<T>(t1) {

            @Override
            public void onNext(T t) {
                t1.onNext(t);
            }

            @Override
            public void onError(Throwable e) {
                onError.call(e);
            }

            @Override
            public void onCompleted() {
                t1.onCompleted();
            }

        };
    }
}

и использовать его близко к наблюдаемому источнику, поскольку другие операторы могут с нетерпением отказаться от подписки до этого.

Observerable.create(connectToUnboundedStream()).lift(new OperatorSuppressError(log()).doOnNext(someStuff()).subscribe();

Обратите внимание, однако, что это подавляет доставку ошибок из источник. Если какой-либо onNext в цепочке после того, как он выдает исключение, это вероятно, источник будет отписано.

Ответ 4

Если вы просто хотите игнорировать ошибку внутри flatMap , не возвращая элемент, выполните следующее:

flatMap(item -> 
    restService.getSomething(item).onErrorResumeNext(Observable.empty())
);

Ответ 5

Попробуйте вызвать службу останова в вызове Observable.defer. Таким образом, для каждого вызова у вас будет возможность использовать свой собственный 'onErrorResumeNext', и ошибки не вызовут ваш основной поток.

reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
  .buffer(50)
  .flatMap(locations ->
    Observable.defer(() -> mRestService.postLocations(locations))
      .onErrorResumeNext(<SOME_DEFAULT_TO_REACT_TO>)
  )
........

Это решение из этого потока → RxJava Observable и Subscriber для пропуска исключений?, но я думаю, что это будет работать и в вашем случае.

Ответ 6

Добавьте решение этой проблемы:

privider
    .compose(ignoreErrorsTransformer)
    .subscribe()

private final Observable.Transformer<ResultType, ResultType> ignoreErrorsTransformer =
        new Observable.Transformer<ResultType, ResultType>() {
            @Override
            public Observable<ResultType> call(Observable<ResultType> resultTypeObservable) {
                return resultTypeObservable
                        .materialize()
                        .filter(new Func1<Notification<ResultType>, Boolean>() {
                            @Override
                            public Boolean call(Notification<ResultType> resultTypeNotification) {
                                return !resultTypeNotification.isOnError();
                            }
                        })
                        .dematerialize();

            }
        };

Ответ 7

Небольшая модификация решения (@MikeN), позволяющая завершить конечные потоки:

import rx.Observable.Operator;
import rx.functions.Action1;

public final class OperatorSuppressError<T> implements Operator<T, T> {
    final Action1<Throwable> onError;

    public OperatorSuppressError(Action1<Throwable> onError) {
        this.onError = onError;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> t1) {
        return new Subscriber<T>(t1) {

            @Override
            public void onNext(T t) {
                t1.onNext(t);
            }

            @Override
            public void onError(Throwable e) {
                onError.call(e);
                //this will allow finite streams to complete
                t1.onCompleted();
            }

            @Override
            public void onCompleted() {
                t1.onCompleted();
            }

        };
    }
}