Rx Java mergeDelayError не работает должным образом

Я использую приложение RxJava и Android с RxAndroid. Я использую mergeDelayError, чтобы объединить два сетевых сетевых адаптационных привязки в один наблюдаемый, который будет обрабатывать испускаемые элементы, если он испускает один, и ошибку, если он имеет один, Это не работает, и оно только активирует действие onError, когда либо встречается ошибка. Теперь, чтобы проверить это, я перешел на очень простой пример, и все еще успехAction никогда не вызывается, когда у меня есть вызов onError. См. Пример ниже.

Observable.mergeDelayError(
                Observable.error(new RuntimeException()),
                Observable.just("Hello")
            )
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .finallyDo(completeAction)
            .subscribe(successAction, errorAction);

Действие успеха будет вызвано только при использовании двух наблюдаемых результатов. Я что-то пропустил, как должно работать mergeDelayError?

EDIT:

Я обнаружил, что если я удалю observeOn и subscribeOn, все будет работать так, как ожидалось. Мне нужно указать потоки и подумать, что это была цель использования Rx. Любая идея, почему указание этих Schedulers нарушит поведение?

Ответы

Ответ 1

Это все еще кажется ошибкой в ​​операторе mergeDelayError, но я смог заставить его работать, дублируя ObserverOn и подписываясь на каждый наблюдаемый.

Observable.mergeDelayError(
            Observable.error(new RuntimeException())
                 .observeOn(AndroidSchedulers.mainThread())
                 .subscribeOn(Schedulers.io()),
            Observable.just("Hello")
                 .observeOn(AndroidSchedulers.mainThread())
                 .subscribeOn(Schedulers.io())
        )
        .finallyDo(completeAction)
        .subscribe(successAction, errorAction);

Ответ 2

Используйте .observeOn(AndroidSchedulers.mainThread(), true) вместо .observeOn(AndroidSchedulers.mainThread()

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
        return observeOn(scheduler, delayError, RxRingBuffer.SIZE);
    }

Выше была подпись функции observOn. Работает следующий код.

  Observable.mergeDelayError(
                Observable.error(new RuntimeException()),
                Observable.just("Hello")
        )
                .observeOn(AndroidSchedulers.mainThread(), true)
                .subscribeOn(Schedulers.io())
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {

                    }
                });

Получил этот трюк из потока ConcatDelayError: https://github.com/ReactiveX/RxJava/issues/3908#issuecomment-217999009

Ответ 3

Я думаю, что вы не дождались события терминала, и основной поток завершается до того, как события будут доставлены вашему наблюдателю. Следующий тест проходит для меня с RxJava 1.0.14:

@Test
public void errorDelayed() {
    TestSubscriber<Object> ts = TestSubscriber.create();
    Observable.mergeDelayError(
            Observable.error(new RuntimeException()),
            Observable.just("Hello")
        )
        .subscribeOn(Schedulers.io()).subscribe(ts);

    ts.awaitTerminalEvent();

    ts.assertError(RuntimeException.class);
    ts.assertValue("Hello");
}