Сочетание слушателя данных Firebase в реальном времени с RxJava

Я использую Firebase в своем приложении вместе с RxJava. Firebase может уведомлять ваше приложение, когда что-то изменилось во внутренних файлах (добавление, удаление, изменения,...). Я пытаюсь объединить функцию Firebase с RxJava.

Данные, которые я слушаю, называется Leisure, а Observable испускает LeisureUpdate который содержит Leisure и тип обновления (добавление, удаление, перемещение, изменение).

Вот мой метод, который позволяет подписаться на эти события.

private Observable<LeisureUpdate> leisureUpdatesObservable;
private ChildEventListener leisureUpdatesListener;
private int leisureUpdatesSubscriptionsCount;

@NonNull
public Observable<LeisureUpdate> subscribeToLeisuresUpdates() {
    if (leisureUpdatesObservable == null) {
        leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() {

            @Override
            public void call(final Subscriber<? super LeisureUpdate> subscriber) {
                leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() {
                    @Override
                    public void onChildAdded(DataSnapshot dataSnapshot, String s) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED));
                    }

                    @Override
                    public void onChildChanged(DataSnapshot dataSnapshot, String s) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.CHANGED));
                    }

                    @Override
                    public void onChildRemoved(DataSnapshot dataSnapshot) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.REMOVED));
                    }

                    @Override
                    public void onChildMoved(DataSnapshot dataSnapshot, String s) {
                        final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
                        subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.MOVED));
                    }

                    @Override
                    public void onCancelled(FirebaseError firebaseError) {
                        subscriber.onError(new Error(firebaseError.getMessage()));
                    }
                });
            }
        });
    }
    leisureUpdatesSubscriptionsCount++;
    return leisureUpdatesObservable;
}

Во-первых, я хотел бы использовать метод Observable.fromCallable() для создания Observable, но, я думаю, это невозможно, поскольку Firebase использует обратные вызовы, правильно?

Я сохраняю один экземпляр Observable, чтобы всегда иметь один Observable котором может подписаться несколько Subscriber.

Проблема возникает, когда все отписываются, и мне нужно прекратить прослушивание событий в Firebase. Я все равно не нашел, чтобы Observable понял, есть ли какая-либо подписка. Поэтому я продолжаю подсчитывать, сколько звонков я получил, чтобы subscribeToLeisuresUpdates(), with leisureUpdatesSubscriptionsCount.

Затем каждый раз, когда кто-то хочет отказаться от подписки, он должен позвонить

@Override
public void unsubscribeFromLeisuresUpdates() {
    if (leisureUpdatesObservable == null) {
        return;
    }
    leisureUpdatesSubscriptionsCount--;
    if (leisureUpdatesSubscriptionsCount == 0) {
        firebase.child(FirebaseStructure.LEISURES).removeEventListener(leisureUpdatesListener);
        leisureUpdatesObservable = null;
    }
}

Это единственный способ заставить Observable испускать элементы, когда есть подписчик, но я чувствую, что должен быть более простой способ, особенно понимая, что больше нет подписчиков, слушающих наблюдаемые.

Любой, кто столкнулся с подобной проблемой или имеет другой подход?

Ответы

Ответ 1

Кроме того, вы можете использовать мою библиотеку rxFirebase

Ответ 2

Вы можете использовать Observable.fromEmitter, что-то вдоль этих строк

    return Observable.fromEmitter(new Action1<Emitter<LeisureUpdate>>() {
        @Override
        public void call(final Emitter<LeisureUpdate> leisureUpdateEmitter) {
            final ValueEventListener listener = new ValueEventListener() {
                @Override
                public void onDataChange(DataSnapshot dataSnapshot) {
                    // process update
                    LeisureUpdate leisureUpdate = ...
                    leisureUpdateEmitter.onNext(leisureUpdate);
                }

                @Override
                public void onCancelled(DatabaseError databaseError) {
                    leisureUpdateEmitter.onError(new Throwable(databaseError.getMessage()));
                    mDatabaseReference.removeEventListener(this);
                }
            };
            mDatabaseReference.addValueEventListener(listener);
            leisureUpdateEmitter.setCancellation(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    mDatabaseReference.removeEventListener(listener);
                }
            });
        }
    }, Emitter.BackpressureMode.BUFFER);

Ответ 3

Поместите это в свой Observable.create() в конце.

subscriber.add(Subscriptions.create(new Action0() {
                    @Override public void call() {
                        ref.removeEventListener(leisureUpdatesListener);
                    }
                }));

Ответ 4

Я предлагаю вам проверить как ссылку (или просто использовать ее) в одной из следующих библиотек:

RxJava: https://github.com/nmoskalenko/RxFirebase

RxJava 2.0: https://github.com/FrangSierra/Rx2Firebase

Один из них работает с RxJava, а другой - с новым RC RxJava 2.0. Если вы заинтересованы в нем, вы можете увидеть различие между как здесь.

Ответ 6

Вот пример кода для использования RxJava2 с Firebase CompletionListener:

Completable.create(new CompletableOnSubscribe() {
        @Override
        public void subscribe(final CompletableEmitter e) throws Exception {
            String orderKey = FirebaseDatabase.getInstance().getReference().child("orders").push().getKey();
            FirebaseDatabase.getInstance().getReference().child("orders").child(orderKey).setValue(order,
                    new DatabaseReference.CompletionListener() {
                        @Override
                        public void onComplete(DatabaseError databaseError, DatabaseReference databaseReference) {
                            if (e.isDisposed()) {
                                return;
                            }
                            if (databaseError == null) {
                                e.onComplete();
                            } else {
                                e.onError(new Throwable(databaseError.getMessage()));
                            }
                        }
                    });

        }
    }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

Ответ 7

Еще одна удивительная библиотека, которая поможет вам обернуть всю базу данных базы данных firebase в режиме rx.

https://github.com/Link184/Respiration

Здесь вы можете создать свой репозиторий firebase и расширить его из GeneralRepository, например:

@RespirationRepository(dataSnapshotType = Leisure.class)
public class LeisureRepository extends GeneralRepository<Leisure>{
    protected LeisureRepository(Configuration<Leisure> repositoryConfig) {
        super(repositoryConfig);
    }

    // observable will never emit any events if there are no more subscribers
    public void killRepository() {
        if (!behaviorSubject.hasObservers()) {
            //protected fields
            behaviorSubject.onComplete();
            databaseReference.removeEventListener(valueListener);
        }
    }
}

Вы можете "убить" ваш репозиторий таким образом:

// LeisureRepositoryBuilder is a generated class by annotation processor, will appear after a successful gradle build
LeisureRepositoryBuilder.getInstance().killRepository();

Но я думаю, что для вашей ситуации будет лучше расширить com.link184.respiration.repository.ListRepository, чтобы избежать сопоставления данных из java.util.Map to Leisure модели через LeisureUpdate