RxJava + Retrofit → BaseObservable для вызовов API для централизованной обработки ответов

Я новичок в RxJava, поэтому, пожалуйста, простите меня, если это звучит слишком новичком: -).

На данный момент у меня есть абстрактный CallbackClass, который реализует обратный вызов Retofit. Там я улавливаю методы обратного вызова "onResponse" и "onError" и обрабатываю различные типы ошибок, прежде чем, наконец, переадресую пользовательские реализованные методы. Я также использую этот централизованный класс для ведения журнала запросов и ответов и других материалов.

Например: для конкретных кодов ошибок из моего сервера я получаю новый токен Auth в теле ответа, обновляю токен, а затем clone.enqueue вызов. Есть, конечно, несколько других глобальных действий для ответов с моего сервера.

Текущее решение (без Rx):

    public abstract void onResponse(Call<T> call, Response<T> response, boolean isSuccess);

    public abstract void onFailure(Call<T> call, Response<T> response, Throwable t, boolean isTimeout);

    @Override
    public void onResponse(Call<T> call, Response<T> response) {
        if (_isCanceled) return;

        if (response != null && !response.isSuccessful()) {
            if (response.code() == "SomeCode" && retryCount < RETRY_LIMIT) {
                TokenResponseModel newToken = null;
                try {
                    newToken = new Gson().fromJson(new String(response.errorBody().bytes(), "UTF-8"), TokenResponseModel.class);
                } catch (Exception e) {
                    e.printStackTrace();
                }

                    SomeClass.token = newToken.token;
                    retryCount++;
                    call.clone().enqueue(this);
                    return;
                }
            }
        } else {
            onResponse(call, response, true);
            removeFinishedRequest();
            return;
        }

        onFailure(call, response, null, false);
        removeFinishedRequest();
    }

    @Override
    public void onFailure(Call<T> call, Throwable t) {
        if (_isCanceled) return;

        if (t instanceof UnknownHostException)
            if (eventBus != null)
                eventBus.post(new NoConnectionErrorEvent());

        onFailure(call, null, t, false);
        removeFinishedRequest();
    }

Мой вопрос: есть ли способ иметь такое централизованное поведение обработки ответов, прежде чем, наконец, цепочка (или повторная попытка) вернется к методам подписчика?

Я нашел эти 2 ссылки, которые имеют хорошую отправную точку, но не конкретное решение. Любая помощь будет действительно оценена.

Принудительный запрос запроса после пользовательских исключений API в RxJava

Операторы обработки ошибок Retrofit 2 и RxJava

Ответы

Ответ 1

Две ссылки, которые вы предоставили, являются действительно хорошей отправной точкой, которую я использовал для создания решения для реагирования на случайные

  • сетевые ошибки иногда возникают из-за временного отсутствия сетевого подключения или переключения на стандарт с низкой пропускной способностью, например EDGE, что вызывает SocketTimeoutException
  • ошибки сервера → иногда происходят из-за перегрузки сервера

Я обработал ошибки CallAdapter.Factory, чтобы обработать ошибки и соответствующим образом отреагировать на них.

  • Импортируйте RetryWithDelayIf из решения, который вы нашли

  • Переопределить CallAdapter.Factory для обработки ошибок:

    public class RxCallAdapterFactoryWithErrorHandling extends CallAdapter.Factory {
        private final RxJavaCallAdapterFactory original;
    
        public RxCallAdapterFactoryWithErrorHandling() {
            original = RxJavaCallAdapterFactory.create();
        }
    
        @Override
        public CallAdapter<?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
            return new RxCallAdapterWrapper(retrofit, original.get(returnType, annotations, retrofit));
        }
    
        public class RxCallAdapterWrapper implements CallAdapter<Observable<?>> {
            private final Retrofit retrofit;
            private final CallAdapter<?> wrapped;
    
            public RxCallAdapterWrapper(Retrofit retrofit, CallAdapter<?> wrapped) {
                this.retrofit = retrofit;
                this.wrapped = wrapped;
            }
    
            @Override
            public Type responseType() {
                return wrapped.responseType();
            }
    
            @SuppressWarnings("unchecked")
            @Override
            public <R> Observable<?> adapt(final Call<R> call) {
                return ((Observable) wrapped.adapt(call)).onErrorResumeNext(new Func1<Throwable, Observable>() {
                    @Override
                    public Observable call(Throwable throwable) {
                        Throwable returnThrowable = throwable;
                        if (throwable instanceof HttpException) {
                            HttpException httpException = (HttpException) throwable;
                            returnThrowable = httpException;
                            int responseCode = httpException.response().code();
                            if (NetworkUtils.isClientError(responseCode)) {
                                returnThrowable = new HttpClientException(throwable);
                            }
                            if (NetworkUtils.isServerError(responseCode)) {
                                returnThrowable = new HttpServerException(throwable);
                            }
                        }
    
                        if (throwable instanceof UnknownHostException) {
                            returnThrowable = throwable;
                        }
    
                        return Observable.error(returnThrowable);
                    }
                }).retryWhen(new RetryWithDelayIf(3, DateUtils.SECOND_IN_MILLIS, new Func1<Throwable, Boolean>() {
                    @Override
                    public Boolean call(Throwable throwable) {
                        return throwable instanceof HttpServerException
                                || throwable instanceof SocketTimeoutException
                                || throwable instanceof UnknownHostException;
                    }
                }));
            }
        }
    }
    

    HttpServerException является обычным исключением.

  • Используйте его в Retrofit.Builder

    Retrofit retrofit = new Retrofit.Builder()
            .addCallAdapterFactory(new RxCallAdapterFactoryWithErrorHandling())
            .build();
    

Экстра. Если вы хотите проанализировать ошибки, возникающие из API (ошибка, которая не вызывает UnknownHostException, HttpException или MalformedJsonException или т.д.), вам необходимо переопределить Factory и использовать пользовательский во время создания экземпляра Retrofit. Разберите ответ и проверьте, содержит ли он ошибки. Если "да", тогда ошибка броска и ошибки будут обрабатываться внутри метода выше.

Ответ 2

Считаете ли вы использование адаптера rxjava для переоснащения? https://mvnrepository.com/artifact/com.squareup.retrofit2/adapter-rxjava/2.1.0 в файле gradle добавить

compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0'

здесь интерфейс для модернизации

public interface Service {
@GET("userauth/login?")
Observable<LoginResponse> getLogin(
        @Query("v") String version,
        @Query("username") String username,
        @Query("password") String password);
}

и здесь моя реализация

Service.getLogin(
            VERSION,
            "username",
            "password")
            .subscribe(new Subscriber<LoginResponse>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(LoginResponse loginResponse) {

                }
            });

Обратите внимание, что я использую gson converter factory для анализа моего ответа, поэтому я возвращаю POJO (объект Plain Ole Java Object).

Ответ 3

Посмотрите, как вы можете это сделать.  Здесь api call и передать модель запроса и модель ответа в этом.

public interface RestService {
//SEARCH_USER
@POST(SEARCH_USER_API_LINK)
Observable<SearchUserResponse> getSearchUser(@Body SearchUserRequest getSearchUserRequest);
}

Это переоснащение, я использовал retrofit2

public RestService getRestService() {

    Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(ApiConstants.BASE_URL)
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
            .client(getOkHttpClient())
            .build();

    return retrofit.create(RestService.class);
}

//get OkHttp instance
@Singleton
@Provides
public OkHttpClient getOkHttpClient() {

    HttpLoggingInterceptor httpLoggingInterceptor = new HttpLoggingInterceptor();
    httpLoggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);

    OkHttpClient.Builder builder = new OkHttpClient.Builder();
    builder.interceptors().add(httpLoggingInterceptor);
    builder.readTimeout(60, TimeUnit.SECONDS);
    builder.connectTimeout(60, TimeUnit.SECONDS);
    return builder.build();
}

Это вызов api, назовите его в своей деятельности.

@Inject
Scheduler mMainThread;
@Inject
Scheduler mNewThread;

 //getSearchUser api method
public void getSearchUser(String user_id, String username) {

    SearchUserRequest searchUserRequest = new SearchUserRequest(user_id, username);

    mObjectRestService.getSearchUser(searchUserRequest).
            subscribeOn(mNewThread).
            observeOn(mMainThread).
            subscribe(searchUserResponse -> {
                Timber.e("searchUserResponse :" + searchUserResponse.getResponse().getResult());
                if (isViewAttached()) {
                    getMvpView().hideProgress();
                    if (searchUserResponse.getResponse().getResult() == ApiConstants.STATUS_SUCCESS) {

                    } else {

                    }
                }
            }, throwable -> {
                if (isViewAttached()) {

                }
            });
}

Надеюсь, это поможет вам.