ReactiveX concat не производит onNext от первого наблюдаемого, если второй не сразу
Я объединяю два наблюдаемых, чтобы сначала отображать данные из кеша и после этого запускать загрузку данных из сети и отображать обновленные данные.
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler),
getContentFromNetwork.subscibeOn(networkScheduler)
).observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
Если нет сетевого подключения, второй наблюдаемый сбой сразу после вызова OnSubscribe.
В случае сбоя второго наблюдаемого немедленно теряются данные с первого наблюдаемого. Метод onNext никогда не вызывается в подписчике.
Я думаю, это может быть связано с следующим кодом в OperatorConcat.ConcatSubscriber
@Override
public void onNext(Observable<? extends T> t) {
queue.add(nl.next(t));
if (WIP_UPDATER.getAndIncrement(this) == 0) {
subscribeNext();
}
}
@Override
public void onError(Throwable e) {
child.onError(e);
unsubscribe();
}
Похоже, что после получения ошибки он отписывается, и все ожидающие onNext теряются.
Каков наилучший способ решить мою проблему?
Обновление
Похоже, я нашел решение, вместо того, чтобы устанавливать наблюдение для конкатенированных наблюдаемых, я устанавливаю для каждого наблюдаемого наблюдаемое.
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler).observeOn(AndroidSchedulers.mainThread()),
getContentFromNetwork.subscibeOn(networkScheduler).observeOn(AndroidSchedulers.mainThread())
)
.subscribe(subscriber);
Ответы
Ответ 1
Поведение по умолчанию observeOn
том, что события onError
могут onError
перед очередью, вот цитата из документации:
Обратите внимание, что уведомления onError
будут обрезаться раньше, onNext
уведомления onNext
в потоке onNext
если Scheduler
действительно асинхронный.
Вот небольшой тест, чтобы проиллюстрировать это:
Scheduler newThreadScheduler = Schedulers.newThread();
Observable<Integer> stream = Observable.create(integerEmitter -> {
integerEmitter.onNext(1);
integerEmitter.onNext(2);
integerEmitter.onNext(3);
integerEmitter.onNext(4);
integerEmitter.onNext(5);
integerEmitter.onError(new RuntimeException());
}, Emitter.BackpressureMode.NONE);
TestSubscriber<Integer> subscriber = new TestSubscriber<>();
stream.subscribeOn(Schedulers.computation())
.observeOn(newThreadScheduler).subscribe(subscriber);
subscriber.awaitTerminalEvent();
subscriber.assertValues(1, 2, 3, 4, 5);
subscriber.assertError(RuntimeException.class);
Обычно потребитель ожидает следующую последовательность: 1> 2> 3> 4> 5> Error
. Но использование только observeOn
может привести к ошибке, и тест не observeOn
.
Это поведение было реализовано давно здесь https://github.com/ReactiveX/RxJava/issues/1680, проверьте мотивацию, почему это было сделано так. Чтобы избежать такого поведения, можно использовать перегруженный observeOn
с параметром delayError
:
указывает, может onError
уведомление onError
не обрезаться раньше уведомления onNext
на другой стороне границы планирования. Если true
последовательность, заканчивающаяся на onError
будет воспроизведена в том же порядке, в котором была получена от восходящего потока.
Это то, что вы обычно ожидаете, поэтому изменение observeOn(newThreadScheduler)
на observeOn(newThreadScheduler, true)
исправит тест.
Тогда на вопрос @Neil: почему работает решение, предложенное @Rostyslav? Это работает, потому что нет никакого переключателя потока для окончательной последовательности.
В предлагаемом решении конечная последовательность создается из двух последовательностей в одном потоке: 1-я последовательность - это данные из кэша, 2-я последовательность - просто ошибка из сети. Они создаются вместе в одном потоке, и после того, как нет переключения потоков, подписчик наблюдает за AndroidSchedulers.mainThread()
. Если вы попытаетесь изменить окончательный Scheduler
на какой-то другой, он снова потерпит неудачу.
Ответ 2
Операторы в RxJava предназначены для короткого замыкания уведомлений об обходах в целом. Поскольку наблюдаемые конкатенированные являются асинхронными источниками, вы испытываете короткое замыкание. Если вы не хотите короткого замыкания, вы можете сделать concat на материализованных наблюдаемых, а затем выполнить требуемую обработку:
Observable.concat(
getContentFromCache.materialize().subscribeOn(dbScheduler),
getContentFromNetwork.materialize().subscribeOn(networkScheduler)
)
Другим подходом было бы использовать onErrorResumeNext
:
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler),
getContentFromNetwork.onErrorResumeNext(something)
.subscibeOn(networkScheduler)
)
Ответ 3
@Rostyslav, ваше обновление, безусловно, делает трюк. Я боролся с сценарием EXACT, который вы описываете. Можете ли вы объяснить, почему это работает? Я не совсем понимаю, как перемещалось наблюдение с внутренними наблюдаемыми, где это не было на конкатенаблюдении.
Ответ 4
This is the small Example for concat both Observable
// First Observable //
private Observable<Book> getAutobiography()
{
String [] booknames= new String[]{"book1","book2"};
final ArrayList<Book> arrayList = new ArrayList<>();
for(String name:booknames){
Book book = new Book();
book.setBooktype("Autobiograph");
book.setBookname(name);
arrayList.add(book);
}
return Observable.create(new ObservableOnSubscribe<Book>() {
@Override
public void subscribe(ObservableEmitter<Book> emitter) throws Exception {
for(Book book:arrayList)
{
emitter.onNext(book);
}
if (!emitter.isDisposed()) {
emitter.onComplete();
}
}
}).subscribeOn(Schedulers.io());
}
// Second Observable
private Observable<Book> getProgrammingBooks()
{
String [] booknames= new String[]{"book3","book4"};
final ArrayList<Book> arrayList = new ArrayList<>();
for(String name:booknames){
Book book = new Book();
book.setBooktype("Programming");
book.setBookname(name);
arrayList.add(book);
}
return Observable.create(new ObservableOnSubscribe<Book>() {
@Override
public void subscribe(ObservableEmitter<Book> emitter) throws Exception {
for(Book book:arrayList)
{
emitter.onNext(book);
}
if (!emitter.isDisposed()) {
emitter.onComplete();
}
}
}).subscribeOn(Schedulers.io());
}
// Concat them
Observable.concat(getProgrammingBooks(),getAutobiography()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Book>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Book book) {
Log.d("bookname",""+book.getBookname());
Log.d("booktype",""+book.getBooktype());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
// it print both values //