Подписчики Vs подписываются в RxJava2 (Android)?

Когда вызывать метод subscribeWith, а не просто подписываться? И что такое прецедент?

compositeDisposable.add(get()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribe(this::handleResponse, this::handleError));

VS

   compositeDisposable.add(get()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
              //  .subscribe(this::handleResponse, this::handleError);
                .subscribeWith(new DisposableObserver<News>() {
                    @Override public void onNext(News value) {
                        handleResponse(value);
                    }

                    @Override public void onError(Throwable e) {
                        handleError(e);
                    }

                    @Override public void onComplete() {
                       // dispose here ? why? when the whole thing will get disposed later
                       //via  compositeDisposable.dispose();  in onDestroy();
                    }
                }));

Спасибо


Добавлено позже

В соответствии с документацией оба возвращают одноразовые экземпляры SingleObserver:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <E extends SingleObserver<? super T>> E subscribeWith(E observer) {
    subscribe(observer);
    return observer;
}

@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) {
    ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ConsumerSingleObserver<T> s = new ConsumerSingleObserver<T>(onSuccess, onError);
    subscribe(s);
    return s;
}

Где класс ConsumerSingleObserver реализует SingleObserver и одноразовые.

Ответы

Ответ 1

Замечание # подписаться на объяснение:

В первом фрагменте кода:

.subscribe(this:: handleResponse, this:: handleError));

Фактически вы используете один из нескольких перегруженных методов Observable#subscribe:

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)

Существует еще один, который также выполняет Action для выполнения onComplete:

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete) {

И еще один вариант позволяет просто передать Observer (ПРИМЕЧАНИЕ: метод void) (Изменить 2 - этот метод определен в ObservableSource, который является интерфейсом, который Observable продолжается.)

public final void subscribe(Observer<? super T> observer)

Во втором фрагменте кода в вашем вопросе вы использовали метод subscribeWith, который просто возвращает Observer, который вы передали (для удобства/кэширования и т.д.):

public final <E extends Observer<? super T>> E subscribeWith(E observer)

Обозреватель # onComplete:

Наблюдатель # onComplete вызывается после того, как Observable выпустил все элементы в потоке. Из java-документа:

/**
 * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
 * <p>
 * The {@link Observable} will not call this method if it calls {@link #onError}.
 */
void onComplete();

Так, например, если get() в ваших фрагментах кода возвратил Observable, который испускал несколько объектов News, каждый из них будет обрабатываться в Observer#onNext. Здесь вы можете обрабатывать каждый элемент.

После того, как все они были обработаны (и при условии, что ошибка не возникла), будет вызван вызов onComplete. Здесь вы можете выполнить любые дополнительные действия, которые вам нужно выполнить (например, обновить интерфейс), зная, что вы обработали все объекты News.

Это не следует путать с Disposable#dispose, который вызывается, когда наблюдаемый поток заканчивается (завершение/ошибка) или вручную вами, чтобы прекратить наблюдение (здесь появляется CompositeDisposable, поскольку он помогает вам распоряжаться всего вашего Disposable, который он содержит сразу).

Если в вашем сценарии get() вернет Observable, который испускает только один элемент, вместо использования Observable рассмотрите использование io.reactivex.Single, где вы обрабатываете только один элемент (в onSuccess) и не нужно указывать Action для onComplete:)

Изменить: ответ на ваш комментарий:

Однако я до сих пор не использую subscribeWith, вы сказали, что он проходит наблюдатель за кешированием и т.д., куда он переходит? на полном? а также из того, что я понял, подписывается, на самом деле не потребляет видимое (или одно) право?

Чтобы прояснить объяснение subscribeWith, я имел в виду, что он будет потреблять объект Observer, который вы передали в subscribeWith (точно так же, как метод subscribe) он дополнительно вернет тот же Наблюдатель прямо к вам. На момент написания статьи реализация subscribeWith:

public final <E extends Observer<? super T>> E subscribeWith(E observer) {
    subscribe(observer);
    return observer;
}

Следовательно, subscribeWith может использоваться взаимозаменяемо с subscribe.

Можете ли вы привести пример использования subscribeWith с примером? я думаю что полностью ответит на вопрос

subscribeWith javadoc дает следующий пример использования:

Observable<Integer> source = Observable.range(1, 10);
CompositeDisposable composite = new CompositeDisposable();

ResourceObserver<Integer> rs = new ResourceObserver<>() {
     // ...
};

composite.add(source.subscribeWith(rs));

См. здесь, использование subscribeWith вернет тот же самый объект ResourceObserver, который был создан. Это дает возможность выполнить подписку и добавить ResourceObserver в CompositeDisposable в одну строку (обратите внимание, что ResourceObservable реализует Disposable.)

Изменить 2 Ответ на второй комментарий.

source.subscribeWith(RS); source.subscribe(RS); оба возвращаются Экземпляр SingleObserver,

ObservableSource#subscribe(Observer <? super T> observer) НЕ возвращает Observer. Это недействительный метод (см. Примечание в разделе объяснения Observable # subscribe выше). В то время как Observable#subscribeWith DOES возвращает Observer. Если бы нам пришлось переписать пример кода использования с помощью ObservableSource#subscribe, мы должны были бы сделать это в двух строках, например:

source.subscribe(rs); //ObservableSource#subscribe is a void method so nothing will be returned
composite.add(rs);

В то время как метод Observable#subscribeWith сделал нам удобным сделать это в одной строке composite.add(source.subscribeWith(rs));

Он может запутаться во всех перегруженных методах подписки, которые выглядят несколько схожими, однако есть различия (некоторые из которых тонкие). Рассмотрение кода и документации помогает обеспечить различие между ними.


Изменить 3 Еще один пример использования для subscribeWith

Метод subscribeWith полезен, когда у вас есть конкретная реализация Observer, которую вы можете повторно использовать. Например, в приведенном выше примере кода он предоставил конкретную реализацию ResourceObserver в подписке, тем самым наследуя ее функциональность, все еще позволяя обрабатывать onNext onError и onComplete.

В другом примере используйте: для примера кода в вашем вопросе, что, если вы хотите выполнить одну и ту же подписку для ответа get() в нескольких местах?

Вместо того, чтобы копировать реализации Consumer для onNext и onError для разных классов, вы можете вместо этого определить новый класс, например.

//sample code..
public class GetNewsObserver extends DisposableObserver<News> {
    //implement your onNext, onError, onComplete.
    ....
}

Теперь, когда вы делаете этот запрос get(), вы можете просто подписаться, выполнив:

compositeDisposable.add(get()
    ...
    .subscribeWith(new GetNewsObserver()));

См. здесь код прост, вы сохраняете разделение ответственности за обработку ответа и теперь можете повторно использовать GetNewsObserver везде, где хотите.