Ответ 1
Чтобы ответить на ваш вопрос, позвольте мне начать с самого начала, это позволяет другим людям понять то, что вы уже знаете.
Планировщики
Планировщики играют ту же роль, что и Executors for Java. Вкратце - они решают, какие действия потока выполняются.
Обычно Observable и операторы выполняются в текущем потоке. Иногда вы можете передать Scheduler в Observable или оператор в качестве параметра (например, Observable.timer()).
Кроме того, RxJava предоставляет 2 оператора для указания планировщика:
- subscribeOn - укажите планировщик, на котором будет работать Observable
- наблюдать за - указать планировщик, на котором наблюдатель будет наблюдать эту наблюдаемую
Чтобы понять их быстро, я использую пример кода:
Во всех примерах я буду использовать помощник createObservable, который выдает имя потока, в котором работает Observable:
public static Observable<String> createObservable(){
return Observable.create((Subscriber<? super String> subscriber) -> {
subscriber.onNext(Thread.currentThread().getName());
subscriber.onCompleted();
}
);
}
Без планировщиков:
createObservable().subscribe(message -> {
System.out.println("Case 1 Observable thread " + message);
System.out.println("Case 1 Observer thread " + Thread.currentThread().getName());
});
//will print:
//Case 1 Observable thread main
//Case 1 Observer thread main
SubscribeOn:
createObservable()
.subscribeOn(Schedulers.newThread())
.subscribe(message -> {
System.out.println("Case 2 Observable thread " + message);
System.out.println("Case 2 Observer thread " + Thread.currentThread().getName());
});
//will print:
//Case 2 Observable thread RxNewThreadScheduler-1
//Case 2 Observer thread RxNewThreadScheduler-1
Подписаться и Наблюдать за:
reateObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(message -> {
System.out.println("Case 3 Observable thread " + message);
System.out.println("Case 3 Observer thread " + Thread.currentThread().getName());
});
//will print:
//Case 3 Observable thread RxNewThreadScheduler-2
//Case 3 Observer thread RxNewThreadScheduler-1
ObserveOn:
createObservable()
.observeOn(Schedulers.newThread())
.subscribe(message -> {
System.out.println("Case 4 Observable thread " + message);
System.out.println("Case 4 Observer thread " + Thread.currentThread().getName());
});
//will print:
//Case 4 Observable thread main
//Case 4 Observer thread RxNewThreadScheduler-1
Ответ:
AndroidSchedulers.mainThread() возвращает шедулер, который делегирует работу MessageQueue, связанной с основным потоком.
Для этого он использует android.os.Looper.getMainLooper() и android.os.Handler.
Другими словами, если вы хотите указать конкретный поток, вы должны предоставить средства для планирования и выполнения задач в потоке.
Под ним может использоваться любой тип MQ для хранения задач и логики, которая зацикливает очередь и выполняет задачи.
В Java у нас есть Executor, который предназначен для таких задач. RxJava может легко создать планировщик из такого исполнителя.
Ниже приведен пример, который показывает, как вы можете наблюдать в основном потоке (не особенно полезно, но показывает все необходимые части).
public class RunCurrentThread implements Executor {
private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
public static void main(String[] args) throws InterruptedException {
RunCurrentThread sample = new RunCurrentThread();
sample.observerOnMain();
sample.runLoop();
}
private void observerOnMain() {
createObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.from(this))
.subscribe(message -> {
System.out.println("Observable thread " + message);
System.out.println("Observer thread " + Thread.currentThread().getName());
});
;
}
public Observable<String> createObservable() {
return Observable.create((Subscriber<? super String> subscriber) -> {
subscriber.onNext(Thread.currentThread().getName());
subscriber.onCompleted();
}
);
}
private void runLoop() throws InterruptedException {
while(!Thread.interrupted()){
tasks.take().run();
}
}
@Override
public void execute(Runnable command) {
tasks.add(command);
}
}
И последний вопрос, почему ваш код не заканчивается:
ThreadPoolExecutor по умолчанию использует не демоновские потоки, поэтому ваша программа не завершается, пока они не существуют. Вы должны использовать метод shutdown, чтобы закрыть потоки.