Как получить уведомление о действии отмены подписки наблюдателя в пользовательском Observable в RxJava

Я пытаюсь привязать API-интерфейс на основе прослушивателя к наблюдаемому. Мой код выглядит примерно следующим образом.

def myObservable = Observable.create({ aSubscriber ->
    val listener = {event -> 
      aSubscriber.onNext(event);                
    }
    existingEventSource.addListener(listener)
})

Тем не менее, я хочу, чтобы мое наблюдение немедленно удалило слушателя из основного существующего источникаEventSource, когда наблюдатель вызывает subscription.unscribe(). Как я мог достичь этой цели?

Ответы

Ответ 1

В абстрактном классе Subscriber есть метод add, который позволяет вам добавить Subscription, который будет отписано подписчиком.

def myObservable = Observable.create({ aSubscriber ->
    val listener = {event -> 
      aSubscriber.onNext(event);                
    }
    existingEventSource.addListener(listener)

    // Adds a lambda to be executed when the Subscriber un-subscribes from your Observable
    aSubscriber.add(Subscriptions.create(() -> existingEventSource.removeListener(listener)));
})

Подумайте о aSubscriber как Observer, который подписался на ваш Observable; мы назовем его Subscriber. Пока Subscriber все еще подписывается на Observable, Observable может испускать значения. Но когда этот Subscriber не подписан, тогда он должен остановиться. Но если мы хотим получать уведомления, когда Subscriber не подписывается, мы можем зарегистрировать Action для запуска, когда это произойдет. Для этого используется метод add. Как упоминалось в комментариях @dwursteisen; вы в основном регистрируете лямбду, которая будет выполняться, когда подписчик не подписался.

Также возможно, чтобы Подписка была отписана на другом Планировщике. См. MainThreadSubscription из проекта rxanroid для примера того, как этого достичь.

Вот пример того, как вы будете использовать его для отмены подписки на основной поток

aSubscriber.add(new MainThreadSubscription() {
    @Override
    protected void onUnsubscribe() {
        existingEventSource.removeListener(listener);
    }
});