Одиночный наблюдаемый с несколькими подписчиками

У меня есть Observable<<List<Foo>> getFoo(), который создается из Retrofit Service и после вызова .getFoo(), мне нужно поделиться им с несколькими подписчиками. Однако вызов метода .share() приводит к повторному исполнению сетевого вызова. Оператор воспроизведения не работает. Я знаю, что потенциальное решение может быть .cache(), но я не знаю, почему это поведение вызвано.

Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(API_URL)
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
            .build();

    // Create an instance of our GitHub API interface.

    // Create a call instance for looking up Retrofit contributors.

    Observable<List<Contributor>> testObservable = retrofit
            .create(GitHub.class)
            .contributors("square", "retrofit")
            .share();


    Subscription subscription1 = testObservable
    .subscribe(new Subscriber<List<Contributor>>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable throwable) {

        }

        @Override
        public void onNext(List<Contributor> contributors) {
            System.out.println(contributors);
        }
    });

    Subscription subscription2 = testObservable
            .subscribe(new Subscriber<List<Contributor>>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable throwable) {

                }

                @Override
                public void onNext(List<Contributor> contributors) {
                    System.out.println(contributors + " -> 2");
                }
            });

    subscription1.unsubscribe();
    subscription2.unsubscribe();

Приведенный выше код может воспроизвести вышеупомянутое поведение. Вы можете отладить его и увидеть, что полученные списки принадлежат другому MemoryAddress.

Я также рассматривал ConnectableObservables как потенциальное решение, но для этого требуется, чтобы я носил оригинал, наблюдаемый вокруг, и вызывал .connect() каждый раз, когда я хочу добавить нового Абонента.

Такое поведение с .share() работало нормально до Retrofit 1.9. Он перестал работать на Retrofit 2 - beta. Я еще не тестировал его с версией Retrofit 2 Release, которая была выпущена несколько часов назад.

EDIT: 01/02/2017

Для будущих читателей я написал статью здесь, объясняющую больше об этом случае!

Ответы

Ответ 1

Кажется, вы (неявно) вернули свой ConnectedObservable, возвращенный .share() обратно в обычный Observable. Возможно, вам захочется прочитать разницу между горячими и холодными наблюдаемыми.

Попробуйте

ConnectedObservable<List<Contributor>> testObservable = retrofit
        .create(GitHub.class)
        .contributors("square", "retrofit")
        .share();

Subscription subscription1 = testObservable
   .subscribe(new Subscriber<List<Contributor>>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onNext(List<Contributor> contributors) {
        System.out.println(contributors);
    }
});

Subscription subscription2 = testObservable
        .subscribe(new Subscriber<List<Contributor>>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(List<Contributor> contributors) {
                System.out.println(contributors + " -> 2");
            }
        });

testObservable.connect();
subscription1.unsubscribe();
subscription2.unsubscribe();

Изменить: вам не нужно вызывать connect() каждый раз, когда вам нужна новая подписка, вам нужно только его, чтобы запустить наблюдаемый. Я полагаю, вы могли бы использовать replay(), чтобы убедиться, что все последующие подписчики получают все созданные предметы

ConnectedObservable<List<Contributor>> testObservable = retrofit
        .create(GitHub.class)
        .contributors("square", "retrofit")
        .share()
        .replay()

Ответ 2

После проверки с разработчиком RxJava Dávid Karnok я хотел бы предложить полное объяснение того, что здесь происходит.

share() определяется как publish().refCount(), i. е. источник Observable сначала преобразуется в ConnectableObservable на publish(), но вместо того, чтобы называть connect() "вручную", эта часть обрабатывается refCount(). В частности, refCount вызовет connect() на ConnectableObservable, когда он сам получит первую подписку; то, если есть хотя бы один абонент, он будет оставаться подписчиком; и, наконец, когда число подписчиков снизится до 0, оно отменяет подписку вверх. С холодным Observables, как и те, которые возвращены Retrofit, это остановит любые текущие вычисления.

Если после одного из этих циклов появится еще один абонент, refCount снова вызовет connect и, таким образом, инициирует новую подписку на источник Observable. В этом случае он инициирует другой сетевой запрос.

Теперь это, как правило, не стало очевидным с Retrofit 1 (и даже с любой версией до this commit), поскольку эти старые версии Retrofit по умолчанию переместили все сетевые запросы к другому потоку. Обычно это означало, что все ваши вызовы subscribe() произойдут, когда первый запрос /Observable все еще работает, и поэтому новый Subscriber будет просто добавлен в refCount и поэтому не будет запускать дополнительные запросы /Observables.

Однако новые версии Retrofit по умолчанию больше не переносят работу в другой поток - вы должны сделать это явно, вызывая, например, subscribeOn(Schedulers.io()). Если вы этого не сделаете, все останется только в текущем потоке, что означает, что второй subscribe() произойдет только после того, как первый Observable вызвал onCompleted и, следовательно, все Subscribers отказались от подписки и все закрыто, Теперь, как мы видели в первом абзаце, когда вызывается второй subscribe(), share() не имеет другого выбора, кроме как вызвать другой Subscription для источника Observable и инициировать другой сетевой запрос.

Итак, чтобы вернуться к поведению, к которому вы привыкли, от Retrofit 1, просто добавьте subscribeOn(Schedulers.io()).

Это приведет к выполнению только сетевого запроса - большую часть времени. В принципе, вы все равно можете получить несколько запросов (и вы всегда можете использовать с Retrofit 1), но только если ваши сетевые запросы очень быстрые и/или вызовы subscribe() происходят со значительной задержкой, так что, опять же, первый запрос завершается, когда происходит второй subscribe().

Поэтому Давид предлагает либо использовать cache() (но он имеет недостатки, о которых вы упоминали), либо replay().autoConnect(). В соответствии с этими примечаниями к выпуску autoConnect работает только в первой половине refCount, а точнее,

похож на поведение refCount(), за исключением того, что он не отключается когда подписчики теряются.

Это означает, что запрос будет срабатывать только тогда, когда произойдет первый subscribe(), но затем все последующие Subscriber будут получать все испущенные элементы, независимо от того, были ли они в любое время между 0 подписчиками.