Ответ 1
Кроме того, вы можете использовать мою библиотеку rxFirebase
Я использую Firebase в своем приложении вместе с RxJava. Firebase может уведомлять ваше приложение, когда что-то изменилось во внутренних файлах (добавление, удаление, изменения,...). Я пытаюсь объединить функцию Firebase с RxJava.
Данные, которые я слушаю, называется Leisure
, а Observable
испускает LeisureUpdate
который содержит Leisure
и тип обновления (добавление, удаление, перемещение, изменение).
Вот мой метод, который позволяет подписаться на эти события.
private Observable<LeisureUpdate> leisureUpdatesObservable;
private ChildEventListener leisureUpdatesListener;
private int leisureUpdatesSubscriptionsCount;
@NonNull
public Observable<LeisureUpdate> subscribeToLeisuresUpdates() {
if (leisureUpdatesObservable == null) {
leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() {
@Override
public void call(final Subscriber<? super LeisureUpdate> subscriber) {
leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() {
@Override
public void onChildAdded(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED));
}
@Override
public void onChildChanged(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.CHANGED));
}
@Override
public void onChildRemoved(DataSnapshot dataSnapshot) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.REMOVED));
}
@Override
public void onChildMoved(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.MOVED));
}
@Override
public void onCancelled(FirebaseError firebaseError) {
subscriber.onError(new Error(firebaseError.getMessage()));
}
});
}
});
}
leisureUpdatesSubscriptionsCount++;
return leisureUpdatesObservable;
}
Во-первых, я хотел бы использовать метод Observable.fromCallable()
для создания Observable
, но, я думаю, это невозможно, поскольку Firebase использует обратные вызовы, правильно?
Я сохраняю один экземпляр Observable
, чтобы всегда иметь один Observable
котором может подписаться несколько Subscriber
.
Проблема возникает, когда все отписываются, и мне нужно прекратить прослушивание событий в Firebase. Я все равно не нашел, чтобы Observable
понял, есть ли какая-либо подписка. Поэтому я продолжаю подсчитывать, сколько звонков я получил, чтобы subscribeToLeisuresUpdates()
, with leisureUpdatesSubscriptionsCount
.
Затем каждый раз, когда кто-то хочет отказаться от подписки, он должен позвонить
@Override
public void unsubscribeFromLeisuresUpdates() {
if (leisureUpdatesObservable == null) {
return;
}
leisureUpdatesSubscriptionsCount--;
if (leisureUpdatesSubscriptionsCount == 0) {
firebase.child(FirebaseStructure.LEISURES).removeEventListener(leisureUpdatesListener);
leisureUpdatesObservable = null;
}
}
Это единственный способ заставить Observable
испускать элементы, когда есть подписчик, но я чувствую, что должен быть более простой способ, особенно понимая, что больше нет подписчиков, слушающих наблюдаемые.
Любой, кто столкнулся с подобной проблемой или имеет другой подход?
Кроме того, вы можете использовать мою библиотеку rxFirebase
Вы можете использовать Observable.fromEmitter, что-то вдоль этих строк
return Observable.fromEmitter(new Action1<Emitter<LeisureUpdate>>() {
@Override
public void call(final Emitter<LeisureUpdate> leisureUpdateEmitter) {
final ValueEventListener listener = new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
// process update
LeisureUpdate leisureUpdate = ...
leisureUpdateEmitter.onNext(leisureUpdate);
}
@Override
public void onCancelled(DatabaseError databaseError) {
leisureUpdateEmitter.onError(new Throwable(databaseError.getMessage()));
mDatabaseReference.removeEventListener(this);
}
};
mDatabaseReference.addValueEventListener(listener);
leisureUpdateEmitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
mDatabaseReference.removeEventListener(listener);
}
});
}
}, Emitter.BackpressureMode.BUFFER);
Поместите это в свой Observable.create() в конце.
subscriber.add(Subscriptions.create(new Action0() {
@Override public void call() {
ref.removeEventListener(leisureUpdatesListener);
}
}));
Я предлагаю вам проверить как ссылку (или просто использовать ее) в одной из следующих библиотек:
RxJava: https://github.com/nmoskalenko/RxFirebase
RxJava 2.0: https://github.com/FrangSierra/Rx2Firebase
Один из них работает с RxJava, а другой - с новым RC RxJava 2.0. Если вы заинтересованы в нем, вы можете увидеть различие между как здесь.
Вы можете попробовать этот подход Rx для Firebase: RxFirebase https://gist.github.com/gsoltis/86210e3259dcc6998801
Вот пример кода для использования RxJava2 с Firebase CompletionListener:
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(final CompletableEmitter e) throws Exception {
String orderKey = FirebaseDatabase.getInstance().getReference().child("orders").push().getKey();
FirebaseDatabase.getInstance().getReference().child("orders").child(orderKey).setValue(order,
new DatabaseReference.CompletionListener() {
@Override
public void onComplete(DatabaseError databaseError, DatabaseReference databaseReference) {
if (e.isDisposed()) {
return;
}
if (databaseError == null) {
e.onComplete();
} else {
e.onError(new Throwable(databaseError.getMessage()));
}
}
});
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
Еще одна удивительная библиотека, которая поможет вам обернуть всю базу данных базы данных firebase в режиме rx.
https://github.com/Link184/Respiration
Здесь вы можете создать свой репозиторий firebase и расширить его из GeneralRepository, например:
@RespirationRepository(dataSnapshotType = Leisure.class)
public class LeisureRepository extends GeneralRepository<Leisure>{
protected LeisureRepository(Configuration<Leisure> repositoryConfig) {
super(repositoryConfig);
}
// observable will never emit any events if there are no more subscribers
public void killRepository() {
if (!behaviorSubject.hasObservers()) {
//protected fields
behaviorSubject.onComplete();
databaseReference.removeEventListener(valueListener);
}
}
}
Вы можете "убить" ваш репозиторий таким образом:
// LeisureRepositoryBuilder is a generated class by annotation processor, will appear after a successful gradle build
LeisureRepositoryBuilder.getInstance().killRepository();
Но я думаю, что для вашей ситуации будет лучше расширить com.link184.respiration.repository.ListRepository, чтобы избежать сопоставления данных из java.util.Map to Leisure модели через LeisureUpdate