Как создать кеширование/горячую версию rx.Single?

В RxJava v1.0.13 введен новый тип наблюдаемого: rx.Single. Он отлично подходит для модели запроса-ответа, но не имеет стандартных побочных эффектов, вводящих операторов, таких как doOnNext(). Таким образом, гораздо сложнее сделать несколько вещей в результате.

Моя идея состояла в том, чтобы заменить doOnNext() несколькими подписками на один и тот же единственный экземпляр. Но это может привести к тому, что подстилающая работа будет выполняться несколько раз: один раз за каждую подписку.

Пример rx. Одна реализация:

private class WorkerSubscribe<SomeData>() : Single.OnSubscribe<SomeData> {
    override fun call(sub: SingleSubscriber<in SomeData>) {
        try {
            val result = fetchSomeData()
            sub.onSuccess(result)
        } catch(t: Throwable) {
            sub.onError(t)
        }
    }
}

val single = Single.create<SomeData>(WorkerSubscribe())

Использование:

single.subscribe({}, {})
single.subscribe({}, {})   // Data is fetched for the second time

Возможно ли создать экземпляр Single, который не будет fetchSomeData() несколько раз, даже когда single.subscribe() вызывается несколько раз, но кеш и возвращает тот же результат?

Ответы

Ответ 1

Вам нужно Тема RxJava: BehaviorSubject или AsyncSubject

Ответ 2

Мне просто нужно было подобное поведение и нашел какое-то решение.

Вы можете преобразовать Single в Observable применить cache(), а затем преобразовать его обратно в Single.

yourSingle.toObservable().cacheWithInitialCapacity(1).toSingle()

Я использую cacheWithInitialCapacity(1) вместо просто cache() в качестве оптимизации - Single никогда не будет выпускать более одного элемента.

 

Также неплохо обеспечить реализацию Transformer

public class SingleUtils {

    public static <T> Single.Transformer<T, T> cached() {
        return single -> single.toObservable()
            .cacheWithInitialCapacity(1)
            .toSingle();
    }
}

чтобы вы могли использовать кеширование, где бы вы ни находились, вызывая только

yourSingle.compose(SingleUtils.cached())

Edit: Начиная с rxJava 1.2.2 он был добавлен (https://github.com/ReactiveX/RxJava/releases/tag/v1.2.2)

Реализовано именно так (https://github.com/ReactiveX/RxJava/pull/4757)

Ответ 3

Пожалуйста, проверьте cache() оператор. Он должен кэшировать выбросы от Observable и воспроизводить их до следующих Subscriber s.

Ответ 4

Вы можете создать BehaviorSubject/ReplaySubject/AsyncSubject - и затем вызвать onSingle на нем.

Ответ 5

Я сделал обходной путь, что меня это не устраивает, но он работает:

public class Network {
   private Object data;
   private Single<Object> dataSingle;

   public Single<Object> getData {
      if (data == null) {
         if (dataSingle == null) {
            dataSingle = Single.create(...)
             .doOnSuccess( data -> this.data = data;)
             .sibscribeOn(..);
         }
         return dataSingle;
      } else {
         return Single.just(data);
      }
   }
}