Ответ 1
TL; DR: переместите observeOn(AndroidSchedulers.mainThread())
ниже filter(...)
.
subscribeOn(...)
используется для обозначения того, на каком потоке начнет работать Observable
. Последующие вызовы subscribeOn
будут проигнорированы.
Таким образом, если вы должны написать следующее, все будет выполнено на Schedulers.newThread()
:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.subscribe(integer1 -> { doSomething(integer1); });
Теперь, конечно, это не то, что вы хотите: вы хотите doSomething
в основном потоке.
Вот где observeOn
встает на свои места. Все действия после observeOn
выполняются в этом планировщике. Поэтому в вашем примере в основном потоке выполняется filter
.
Вместо этого переместите observeOn
вниз до subscribe
:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer1 -> { doSomething(integer1) });
Теперь filter
произойдет в "новом потоке" и doSomething
в основном потоке.
Чтобы идти еще дальше, вы можете использовать observeOn
несколько раз:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer1 -> { doSomething(integer1) });
В этом случае выборка произойдет в новом потоке, фильтрации в потоке вычислений и doSomething
в основном потоке.
Оформить заказ ReactiveX - оператор SubscribeOn для официальной документации.