Наблюдаемый пробег по основному потоку, хотя subscribeOn() вызывается в другом потоке

У меня возникла странная проблема в одном из моих действий. Когда я вернусь к съемке/видео, в моем onActivityResult я показываю диалог, который позволяет пользователю назвать камеру. Как только пользователь нажмет OK, я отправлю onNext() теме с запрошенным именем файла, который копирует файл (и показывает диалог выполнения).

По какой-то причине функция map(), которая делает копию, всегда вызывается в основном потоке, хотя я вызываю subscribeOn(Schedulers.io()).

@Override
protected void onActivityResult(final int requestCode, int resultCode, Intent intent) {
    ...

    final PublishSubject<String> subject = PublishSubject.create();`

    mSubscription = subject
            .subscribeOn(Schedulers.io())
            .map(new Func1<String, String>() {
                @Override
                public String call(String fileName) {
                    Log.I.d(TAG,"map");
                    return doSomeIOHeavyFuncition();
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<String>() {
                @Override
                public void call(final String fullPath) {
                    Log.d(TAG,"onNext");
                    doSomethingOnUI(fullPath);

                    subject.onCompleted();
                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    ...
                }
            }, new Action0() {
                @Override
                public void call() {
                    ...
                }
            });

    final AlertDialog dialog = new AlertDialog.Builder
    ....
    .create()
            .show();

    dialog.getButton(DialogInterface.BUTTON_POSITIVE)
            .setOnClickListener(new View.OnClickListener() {
                @Override
                public void onClick(View view) {
                    String someString = getStringFromDialog(dialog);

                    dialog.dismiss();
                    InputMethodManager imm = (InputMethodManager) getSystemService(Context.INPUT_METHOD_SERVICE);
                    imm.hideSoftInputFromWindow(input.getWindowToken(), 0);

                    showProgressDialog();
                    subject.onNext(someString);
                }
            });
}

Изменение вызова subscribeOn(Schedulers.io()) на observeOn(Schedulers.io()) решило проблему. Тем не менее, я хотел бы знать, почему это не сработало...

Ответы

Ответ 1

subscribeOn и observeOn - это в основном запутанные операторы. Первый гарантирует, что побочные эффекты подписки будут выполняться в указанном планировщике (потоке), но это не означает, что значения будут всплывать и в этом потоке.

Например, если ваш наблюдатель открывает сетевое соединение, когда подписывается на него, вы не хотите, чтобы это выполнялось в основном потоке, поэтому вам нужно subscribeOn указать, где эта подписка и, следовательно, будет создано сетевое соединение.

Когда данные, наконец, прибывают, излучающий поток может быть любым, одним из планировщиков или фоновым простым старым потоком. Поскольку мы не знаем или не любим этот поток, мы хотим переместить наблюдение за данными в другой поток. Это то, что наблюдается в watchOn: гарантирует, что операторы после этого будут выполнять свою логику onNext в указанном планировщике. Android-разработчики используют его уже для перемещения наблюдения значений обратно в основной поток.

То, что редко объясняется, это то, что происходит, когда вы хотите, чтобы дополнительные вычисления выходили из основного потока, прежде чем окончательный результат снова попадет в основной поток: используйте несколько операторов observeOn:

source
.observeOn(Schedulers.computation())
.map(v -> heavyCalculation(v))
.observeOn(Schedulers.io())
.doOnNext(v -> { saveToDB(v); })
.observeOn(AndroidSchedulers.mainThread())
...