rxandroid просит работать на нити ui, хотя он подписан на AndroidSchedulers.mainThread()

Я написал подписчика, который будет запущен, когда будет OnCameraChangeListener карта Google OnCameraChangeListener.

Observable.create(new Observable.OnSubscribe<LatLng>()
    {
        @Override
        public void call(Subscriber<? super LatLng> subscriber)
        {
            if (!subscriber.isUnsubscribed())
            {
                mMap.setOnCameraChangeListener(cameraPosition ->
                        subscriber.onNext(cameraPosition.target));
            }
        }
    }).subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(AndroidSchedulers.mainThread())
            .onErrorResumeNext(Observable.<LatLng>empty())
            .debounce(1, TimeUnit.SECONDS)
            .subscribe(position -> {
                if (position.latitude != 0 && position.longitude != 0)
                {
                    updateLocationMarker(position);
                }
            });

Я обновляю маркер местоположения следующим образом:

private void updateLocationMarker(LatLng center)
{
    locationMarkertext.setText("Lat:" + center.latitude + " Long:" + center.longitude);
    //locationMarkerLayout.setVisibility(View.VISIBLE);
}

Хотя мой код говорит, что он запускается на AndroidSchedulers.mainThread() я получаю эту ошибку:

Вызывается: rx.exceptions.OnErrorNotImplementedException: только исходный поток, создавший иерархию представлений, может коснуться его представлений.

Может кто-нибудь, пожалуйста, помогите мне понять, в чем проблема с моим подходом

Ответы

Ответ 1

Оператор debounce по умолчанию запускается в планировщике вычислений, где он будет передавать события. Вы должны параметризовать его с помощью планировщика основного потока:

.debounce(1, TimeUnit.SECONDS, AndroidSchedulers.mainThread())

Кроме того, в зависимости от того, где вы создаете источник уведомлений, вам может и не потребоваться observeOn и subscribeOn.

Ответ 2

Принятый ответ - неправильный способ сделать это. Правильнее было установить observeOn(AndroidSchedulers.mainThread()) после debounce(1, TimeUnit.SECONDS). Таким образом, ваш код становится:

Observable.create(new Observable.OnSubscribe<LatLng>()
    {
        @Override
        public void call(Subscriber<? super LatLng> subscriber)
        {
            if (!subscriber.isUnsubscribed())
            {
                mMap.setOnCameraChangeListener(cameraPosition ->
                        subscriber.onNext(cameraPosition.target));
            }
        }
    }).subscribeOn(AndroidSchedulers.mainThread())
            .onErrorResumeNext(Observable.<LatLng>empty())
            .debounce(1, TimeUnit.SECONDS)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(position -> {
                if (position.latitude != 0 && position.longitude != 0)
                {
                    updateLocationMarker(position);
                }
            });

Debounce следует запускать на ComputationScheduler а не в основном потоке.