Одиночный наблюдаемый с несколькими подписчиками
У меня есть Observable<<List<Foo>> getFoo()
, который создается из Retrofit Service и после вызова
.getFoo()
, мне нужно поделиться им с несколькими подписчиками. Однако вызов метода .share()
приводит к повторному исполнению сетевого вызова. Оператор воспроизведения не работает. Я знаю, что потенциальное решение может быть .cache()
, но я не знаю, почему это поведение вызвано.
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(API_URL)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
// Create an instance of our GitHub API interface.
// Create a call instance for looking up Retrofit contributors.
Observable<List<Contributor>> testObservable = retrofit
.create(GitHub.class)
.contributors("square", "retrofit")
.share();
Subscription subscription1 = testObservable
.subscribe(new Subscriber<List<Contributor>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Contributor> contributors) {
System.out.println(contributors);
}
});
Subscription subscription2 = testObservable
.subscribe(new Subscriber<List<Contributor>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Contributor> contributors) {
System.out.println(contributors + " -> 2");
}
});
subscription1.unsubscribe();
subscription2.unsubscribe();
Приведенный выше код может воспроизвести вышеупомянутое поведение. Вы можете отладить его и увидеть, что полученные списки принадлежат другому MemoryAddress.
Я также рассматривал ConnectableObservables как потенциальное решение, но для этого требуется, чтобы я носил оригинал, наблюдаемый вокруг, и вызывал .connect()
каждый раз, когда я хочу добавить нового Абонента.
Такое поведение с .share()
работало нормально до Retrofit 1.9. Он перестал работать на Retrofit 2 - beta. Я еще не тестировал его с версией Retrofit 2 Release, которая была выпущена несколько часов назад.
EDIT: 01/02/2017
Для будущих читателей я написал статью здесь, объясняющую больше об этом случае!
Ответы
Ответ 1
Кажется, вы (неявно) вернули свой ConnectedObservable
, возвращенный .share()
обратно в обычный Observable
. Возможно, вам захочется прочитать разницу между горячими и холодными наблюдаемыми.
Попробуйте
ConnectedObservable<List<Contributor>> testObservable = retrofit
.create(GitHub.class)
.contributors("square", "retrofit")
.share();
Subscription subscription1 = testObservable
.subscribe(new Subscriber<List<Contributor>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Contributor> contributors) {
System.out.println(contributors);
}
});
Subscription subscription2 = testObservable
.subscribe(new Subscriber<List<Contributor>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(List<Contributor> contributors) {
System.out.println(contributors + " -> 2");
}
});
testObservable.connect();
subscription1.unsubscribe();
subscription2.unsubscribe();
Изменить: вам не нужно вызывать connect()
каждый раз, когда вам нужна новая подписка, вам нужно только его, чтобы запустить наблюдаемый. Я полагаю, вы могли бы использовать replay()
, чтобы убедиться, что все последующие подписчики получают все созданные предметы
ConnectedObservable<List<Contributor>> testObservable = retrofit
.create(GitHub.class)
.contributors("square", "retrofit")
.share()
.replay()
Ответ 2
После проверки с разработчиком RxJava Dávid Karnok я хотел бы предложить полное объяснение того, что здесь происходит.
share()
определяется как publish().refCount()
, i. е. источник Observable
сначала преобразуется в ConnectableObservable
на publish()
, но вместо того, чтобы называть connect()
"вручную", эта часть обрабатывается refCount()
. В частности, refCount
вызовет connect()
на ConnectableObservable
, когда он сам получит первую подписку; то, если есть хотя бы один абонент, он будет оставаться подписчиком; и, наконец, когда число подписчиков снизится до 0, оно отменяет подписку вверх. С холодным Observables
, как и те, которые возвращены Retrofit, это остановит любые текущие вычисления.
Если после одного из этих циклов появится еще один абонент, refCount
снова вызовет connect
и, таким образом, инициирует новую подписку на источник Observable. В этом случае он инициирует другой сетевой запрос.
Теперь это, как правило, не стало очевидным с Retrofit 1 (и даже с любой версией до this commit), поскольку эти старые версии Retrofit по умолчанию переместили все сетевые запросы к другому потоку. Обычно это означало, что все ваши вызовы subscribe()
произойдут, когда первый запрос /Observable
все еще работает, и поэтому новый Subscriber
будет просто добавлен в refCount
и поэтому не будет запускать дополнительные запросы /Observables
.
Однако новые версии Retrofit по умолчанию больше не переносят работу в другой поток - вы должны сделать это явно, вызывая, например, subscribeOn(Schedulers.io())
. Если вы этого не сделаете, все останется только в текущем потоке, что означает, что второй subscribe()
произойдет только после того, как первый Observable
вызвал onCompleted
и, следовательно, все Subscribers
отказались от подписки и все закрыто, Теперь, как мы видели в первом абзаце, когда вызывается второй subscribe()
, share()
не имеет другого выбора, кроме как вызвать другой Subscription
для источника Observable и инициировать другой сетевой запрос.
Итак, чтобы вернуться к поведению, к которому вы привыкли, от Retrofit 1, просто добавьте subscribeOn(Schedulers.io())
.
Это приведет к выполнению только сетевого запроса - большую часть времени. В принципе, вы все равно можете получить несколько запросов (и вы всегда можете использовать с Retrofit 1), но только если ваши сетевые запросы очень быстрые и/или вызовы subscribe()
происходят со значительной задержкой, так что, опять же, первый запрос завершается, когда происходит второй subscribe()
.
Поэтому Давид предлагает либо использовать cache()
(но он имеет недостатки, о которых вы упоминали), либо replay().autoConnect()
. В соответствии с этими примечаниями к выпуску autoConnect
работает только в первой половине refCount
, а точнее,
похож на поведение refCount(), за исключением того, что он не отключается когда подписчики теряются.
Это означает, что запрос будет срабатывать только тогда, когда произойдет первый subscribe()
, но затем все последующие Subscriber
будут получать все испущенные элементы, независимо от того, были ли они в любое время между 0 подписчиками.