Ответ 1
Вам нужно Тема RxJava: BehaviorSubject
или AsyncSubject
В RxJava v1.0.13 введен новый тип наблюдаемого: rx.Single. Он отлично подходит для модели запроса-ответа, но не имеет стандартных побочных эффектов, вводящих операторов, таких как doOnNext(). Таким образом, гораздо сложнее сделать несколько вещей в результате.
Моя идея состояла в том, чтобы заменить doOnNext() несколькими подписками на один и тот же единственный экземпляр. Но это может привести к тому, что подстилающая работа будет выполняться несколько раз: один раз за каждую подписку.
Пример rx. Одна реализация:
private class WorkerSubscribe<SomeData>() : Single.OnSubscribe<SomeData> {
override fun call(sub: SingleSubscriber<in SomeData>) {
try {
val result = fetchSomeData()
sub.onSuccess(result)
} catch(t: Throwable) {
sub.onError(t)
}
}
}
val single = Single.create<SomeData>(WorkerSubscribe())
Использование:
single.subscribe({}, {})
single.subscribe({}, {}) // Data is fetched for the second time
Возможно ли создать экземпляр Single, который не будет fetchSomeData() несколько раз, даже когда single.subscribe() вызывается несколько раз, но кеш и возвращает тот же результат?
Вам нужно Тема RxJava: BehaviorSubject
или AsyncSubject
Мне просто нужно было подобное поведение и нашел какое-то решение.
Вы можете преобразовать Single
в Observable
применить cache()
, а затем преобразовать его обратно в Single
.
yourSingle.toObservable().cacheWithInitialCapacity(1).toSingle()
Я использую cacheWithInitialCapacity(1)
вместо просто cache()
в качестве оптимизации - Single
никогда не будет выпускать более одного элемента.
Также неплохо обеспечить реализацию Transformer
public class SingleUtils {
public static <T> Single.Transformer<T, T> cached() {
return single -> single.toObservable()
.cacheWithInitialCapacity(1)
.toSingle();
}
}
чтобы вы могли использовать кеширование, где бы вы ни находились, вызывая только
yourSingle.compose(SingleUtils.cached())
Edit: Начиная с rxJava 1.2.2 он был добавлен (https://github.com/ReactiveX/RxJava/releases/tag/v1.2.2)
Реализовано именно так (https://github.com/ReactiveX/RxJava/pull/4757)
Пожалуйста, проверьте cache()
оператор.
Он должен кэшировать выбросы от Observable
и воспроизводить их до следующих Subscriber
s.
Вы можете создать BehaviorSubject/ReplaySubject/AsyncSubject - и затем вызвать onSingle на нем.
Я сделал обходной путь, что меня это не устраивает, но он работает:
public class Network {
private Object data;
private Single<Object> dataSingle;
public Single<Object> getData {
if (data == null) {
if (dataSingle == null) {
dataSingle = Single.create(...)
.doOnSuccess( data -> this.data = data;)
.sibscribeOn(..);
}
return dataSingle;
} else {
return Single.just(data);
}
}
}