RxJava: вызов отписки изнутри onNext
Интересно, можно ли закончить вызов unsubscribe
из обработчика onNext
следующим образом:
List<Integer> gatheredItems = new ArrayList<>();
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
public void onNext(Integer item) {
gatheredItems.add(item);
if (item == 3) {
unsubscribe();
}
}
public void onCompleted() {
// noop
}
public void onError(Throwable sourceError) {
// noop
}
};
Observable<Integer> source = Observable.range(0,100);
source.subscribe(subscriber);
sleep(1000);
System.out.println(gatheredItems);
Вышеуказанный код правильно выводит, что было собрано всего четыре элемента: [0, 1, 2, 3]
. Но если кто-то изменит источник, наблюдаемый для кэширования:
Observable<Integer> source = Observable.range(0,100).cache();
Затем собраны все сто элементов. У меня нет контроля над наблюдаемым источником (независимо от того, кэшируется он или нет), поэтому как определенно отказаться от подписки из onNext
?
Кстати: Так что отказаться от подписки в onNext
неправильно?
(Мой фактический прецедент заключается в том, что в onNext
я на самом деле записываю поток вывода, а когда IOException
происходит, ничего больше не может быть записано на выход, поэтому мне нужно как-то прекратить дальнейшую обработку.)
Ответы
Ответ 1
Кэш() кэширует все, как только первый абонент приходит, и он не дает никаких опций, чтобы остановить его из нисходящего потока. Вам нужно остановить поток перед кешем(), чтобы избежать слишком большого удержания:
Observable<Integer> source = Observable.range(1, 100);
PublishSubject<Integer> stop = PublishSubject.create();
source
.doOnNext(v -> System.out.println("Generating " + v))
.takeUntil(stop)
.cache()
.doOnNext(new Action1<Integer>() {
int calls;
@Override
public void call(Integer t) {
System.out.println("Saving " + t);
if (++calls == 3) {
stop.onNext(1);
}
}
})
.subscribe();
Изменить: приведенный выше пример не работает ниже 1.0.13, поэтому вот версия, которая должна:
SerialSubscription ssub = new SerialSubscription();
ConnectableObservable<Integer> co = source
.doOnNext(v -> System.out.println("Generating " + v))
.replay();
co.doOnNext(v -> {
System.out.println("Saving " + v);
if (v == 3) {
ssub.unsubscribe();
}
})
.subscribe();
co.connect(v -> ssub.set(v));