Каков правильный способ обработки подписки в RxJava/RxAndroid для жизненного цикла активности?

Я только начинаю работать с RxJava/RxAndroid. Я хочу избежать утечек контекста, поэтому я создал BaseFragment следующим образом:

public abstract class BaseFragment extends Fragment {

    protected CompositeSubscription compositeSubscription = new CompositeSubscription();

    @Override
    public void onDestroy() {
        super.onDestroy();

        compositeSubscription.unsubscribe();
    } 
} 

И внутри моего фрагмента, который расширяет BaseFragment, я делаю это:

protected void fetchNewerObjects(){
        if(!areNewerObjectsFetching()){ //if it is not already fetching newer objects

            Runtime.getRuntime().gc();//clean out memory if possible

            fetchNewObjectsSubscription = Observable
                .just(new Object1())
                .map(new Func1<Object1, Object2>() {
                    @Override
                    public Object2 call(Object1 obj1) {
                        //do bg stuff
                        return obj2;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Object2>() {
                    @Override
                    public void onCompleted() {
                        compositeSubscription.remove(fetchNewObjectsSubscription);
                        fetchNewObjectsSubscription = null;
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(ArrayList<NewsFeedObject> newsFeedObjects) {
                        //do stuff
                    }
                });

        //add subscription to composite subscription so it can be unsubscribed onDestroy()
        compositeSubscription.add(fetchNewObjectsSubscription);
    }
}

protected boolean areNewerObjectsFetching(){
    if(fetchNewObjectsSubscription == null || fetchNewObjectsSubscription.isUnsubscribed()){ //if its either null or is in a finished status
        return false;
    }
    return true;
}

Итак, я думаю, мой вопрос в два раза:

  • Будет ли это останавливать утечку контекста, потому что я отменяю подписку onDestroy()?

  • И правильно ли я отслеживаю, что наблюдаемый "работает", установив подписку на null после завершения и проверив значение nullity?

Ответы

Ответ 1

  • Да, он остановится, но вы также должны установить подписку на null в onError (или после ошибки, вы не будете загружать элементы снова).

    Также не забывайте, что фрагмент можно остановить, но не уничтожить (например, в заднем стекле), и вы можете не захотеть наблюдать ничего в этом случае. Если вы перемещаете unsubscribe от onDestroy до onStop, не забудьте инициализировать compositeSubscription в onCreateView каждый раз, когда создается представление (потому что после того, как CompositeSubscription отменена, вы больше не можете добавлять подписки там).

  • Да, правильно. Но я думаю, что compositeSubscription.remove можно опустить, потому что вы уже проверяете значение null.

Ответ 2

Вам не нужна сторонняя библиотека для управления жизненным циклом деятельности. Попробуйте следующие коды:

public class LifecycleBinder {

    public static <R> Observable.Transformer<R, R> subscribeUtilEvent(final Activity target, LifecycleEvent event) {
        final Application app = target.getApplication();
        final PublishSubject<LifecycleEvent> publishSubject = PublishSubject.create();
        final Application.ActivityLifecycleCallbacks callbacks = new Application.ActivityLifecycleCallbacks() {
            @Override
            public void onActivityCreated(Activity activity, Bundle savedInstanceState) {

            }

            @Override
            public void onActivityStarted(Activity activity) {

            }

            @Override
            public void onActivityResumed(Activity activity) {

            }

            @Override
            public void onActivityPaused(Activity activity) {
                if (activity == target)
                    publishSubject.onNext(LifecycleEvent.ON_PAUSED);
            }

            @Override
            public void onActivityStopped(Activity activity) {
                if (activity == target)
                    publishSubject.onNext(LifecycleEvent.ON_STOPPED);
            }

            @Override
            public void onActivitySaveInstanceState(Activity activity, Bundle outState) {
                if (activity == target)
                    publishSubject.onNext(LifecycleEvent.ON_SAVE_INSTANCE_STATE);
            }

            @Override
            public void onActivityDestroyed(Activity activity) {
                if (activity == target)
                    publishSubject.onNext(LifecycleEvent.ON_DESTROYED);
            }
        };

        app.registerActivityLifecycleCallbacks(callbacks);
        return subscribeUtilEvent(publishSubject, event, new Action0() {
            @Override
            public void call() {
                app.unregisterActivityLifecycleCallbacks(callbacks);
            }
        });
    }

    public static <R> Observable.Transformer<R, R> subscribeUtilEvent(final Fragment target, LifecycleEvent event) {
        final FragmentManager manager = target.getFragmentManager();
        if (manager == null) {
            throw new NullPointerException("fragment manager is null!");
        }

        final PublishSubject<LifecycleEvent> publishSubject = PublishSubject.create();
        final FragmentManager.FragmentLifecycleCallbacks callbacks = manager.new FragmentLifecycleCallbacks() {

            @Override
            public void onFragmentPreAttached(FragmentManager fm, Fragment f, Context context) {
            }

            @Override
            public void onFragmentAttached(FragmentManager fm, Fragment f, Context context) {
            }

            @Override
            public void onFragmentCreated(FragmentManager fm, Fragment f, Bundle savedInstanceState) {
            }

            @Override
            public void onFragmentActivityCreated(FragmentManager fm, Fragment f, Bundle savedInstanceState) {
            }

            @Override
            public void onFragmentViewCreated(FragmentManager fm, Fragment f, View v, Bundle savedInstanceState) {
            }

            @Override
            public void onFragmentStarted(FragmentManager fm, Fragment f) {
            }

            @Override
            public void onFragmentResumed(FragmentManager fm, Fragment f) {
            }

            @Override
            public void onFragmentPaused(FragmentManager fm, Fragment f) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_PAUSED);
            }

            @Override
            public void onFragmentStopped(FragmentManager fm, Fragment f) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_STOPPED);
            }

            @Override
            public void onFragmentSaveInstanceState(FragmentManager fm, Fragment f, Bundle outState) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_SAVE_INSTANCE_STATE);
            }

            @Override
            public void onFragmentViewDestroyed(FragmentManager fm, Fragment f) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_VIEW_DESTORYED);
            }

            @Override
            public void onFragmentDestroyed(FragmentManager fm, Fragment f) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_DESTROYED);
            }

            @Override
            public void onFragmentDetached(FragmentManager fm, Fragment f) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_DESTROYED);
            }
        };
        manager.registerFragmentLifecycleCallbacks(callbacks, true);

        return subscribeUtilEvent(publishSubject, event, new Action0() {
            @Override
            public void call() {
                manager.unregisterFragmentLifecycleCallbacks(callbacks);
            }
        });
    }

    private static <R, T> Observable.Transformer<R, R> subscribeUtilEvent(final Observable<T> source, final T event, final Action0 doOnComplete) {
        return new Observable.Transformer<R, R>() {
            @Override
            public Observable<R> call(Observable<R> rObservable) {
                return rObservable.takeUntil(takeUntilEvent(source, event)).doOnCompleted(doOnComplete);
            }
        };
    }

    private static <T> Observable<T> takeUntilEvent(final Observable<T> src, final T event) {
        return src.takeFirst(new Func1<T, Boolean>() {
            @Override
            public Boolean call(T lifecycleEvent) {
                return lifecycleEvent.equals(event);
            }
        });
    }
}

События жизненного цикла:

public enum LifecycleEvent {
    ON_PAUSED,
    ON_STOPPED,
    ON_SAVE_INSTANCE_STATE,
    ON_DESTROYED,
    ON_VIEW_DESTORYED,
    ON_DETACHED,
}

Использование:

myObservable
   .compose(LifecycleBinder.subscribeUtilEvent(this, LifecycleEvent.ON_DESTROYED))
   .subscribe();