Процесс наблюдения на фоновом потоке

Я использую RxAndroid для потоков. В моем реальном случае использования я получаю список с сервера (с помощью Retrofit). Я использую планировщики для работы над фоновым потоком и получаю окончательную эмиссию в потоке Android UI (основной).

Это хорошо работает для сетевого вызова, однако я понял, что мои операторы после сетевого вызова не используют фоновый поток, а получают вызов в основном потоке.

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .subscribe(integer1 -> {});

Как я могу убедиться, что все операции выполняются в фоновом потоке?

Ответы

Ответ 1

TL; DR: переместите observeOn(AndroidSchedulers.mainThread()) ниже filter(...).


subscribeOn(...) используется для обозначения того, на каком потоке начнет работать Observable. Последующие вызовы subscribeOn будут проигнорированы.

Таким образом, если вы должны написать следующее, все будет выполнено на Schedulers.newThread():

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .subscribe(integer1 -> { doSomething(integer1); });

Теперь, конечно, это не то, что вы хотите: вы хотите doSomething в основном потоке.
Вот где observeOn встает на свои места. Все действия после observeOn выполняются в этом планировщике. Поэтому в вашем примере в основном потоке выполняется filter.

Вместо этого переместите observeOn вниз до subscribe:

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(integer1 -> { doSomething(integer1) });

Теперь filter произойдет в "новом потоке" и doSomething в основном потоке.


Чтобы идти еще дальше, вы можете использовать observeOn несколько раз:

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.computation())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(integer1 -> { doSomething(integer1) });

В этом случае выборка произойдет в новом потоке, фильтрации в потоке вычислений и doSomething в основном потоке.

Оформить заказ ReactiveX - оператор SubscribeOn для официальной документации.