Как создать Обозреватель в динамическом списке в RxJava?
Мне нужно создать Observer над массивом, который постоянно изменяется (добавление элементов).
Я использую Obserable.from(Iterable), но кажется, что он создает Observable над ArrayList, как это происходит в момент создания.
Мне нужно, чтобы Observer был уведомлен и действие должно выполняться каждый раз, когда ArrayList получает новый элемент.
Ответы
Ответ 1
Там вы идете. Благодаря Dávid Karnok в RxJava Google Group
import java.util.ArrayList;
import java.util.List;
import rx.Observable;
import rx.subjects.PublishSubject;
public class ObservableListExample {
public static class ObservableList<T> {
protected final List<T> list;
protected final PublishSubject<T> onAdd;
public ObservableList() {
this.list = new ArrayList<T>();
this.onAdd = PublishSubject.create();
}
public void add(T value) {
list.add(value);
onAdd.onNext(value);
}
public Observable<T> getObservable() {
return onAdd;
}
}
public static void main(String[] args) throws Exception {
ObservableList<Integer> olist = new ObservableList<>();
olist.getObservable().subscribe(System.out::println);
olist.add(1);
Thread.sleep(1000);
olist.add(2);
Thread.sleep(1000);
olist.add(3);
}
}
Ответ 2
Вы можете объединить два наблюдаемых в один.
Один из них может быть исходным списком элементов, а второй может быть предметом:
import rx.Observable;
import rx.subjects.ReplaySubject;
import java.util.ArrayList;
import java.util.List;
public class ExampleObservableList {
public static void main(String[] args) {
List<Integer> initialNumbers = new ArrayList<Integer>();
initialNumbers.add(1);
initialNumbers.add(2);
Observable<Integer> observableInitial = Observable.from(initialNumbers);
ReplaySubject<Integer> subject = ReplaySubject.create();
Observable<Integer> source = Observable.merge(observableInitial, subject);
source.subscribe(System.out::println);
for (int i = 0; i < 100; ++i) {
subject.onNext(i);
}
}
}
Если у вас нет исходных элементов, вы можете использовать только ReplaySubject
(или другие Subject
→ см. http://reactivex.io/documentation/subject.html)
public static void main(String[] args) {
ReplaySubject<Integer> source = ReplaySubject.create();
source.subscribe(System.out::println);
for (int i = 0; i < 100; ++i) {
source.onNext(i);
}
}
Ответ 3
Я бы рассмотрел этот подход, основанный на BehaviourSubject.
Это отличается от решения juanpavergara тем, что onNext() будет немедленно отправлен Наблюдателю при подписке на Observable.
public class ObservableList<T> {
protected final List<T> list;
protected final BehaviorSubject<List<T>> behaviorSubject;
public ObservableList(List<T> list) {
this.list = list;
this.behaviorSubject = BehaviorSubject.create(list);
}
public Observable<List<T>> getObservable() {
return behaviorSubject;
}
public void add(T element) {
list.add(element);
behaviorSubject.onNext(list);
}
}
private void main() {
final List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
final ObservableList<Integer> olist = new ObservableList<>(list);
olist.getObservable().subscribe(System.out::println);
olist.add(2);
olist.add(3);
}
Это решение может быть полезно при реализации MVP, когда вы хотите наблюдать один ресурс (т.е. список объектов), возвращаемый одним компонентом в системе (то есть: один репозиторий или DataSource), и вы хотите, чтобы Observer (т.е.: Presenter или Interactor) для уведомления, когда элемент добавляется в список в другой части системы.