Как создать Обозреватель в динамическом списке в RxJava?

Мне нужно создать Observer над массивом, который постоянно изменяется (добавление элементов).

Я использую Obserable.from(Iterable), но кажется, что он создает Observable над ArrayList, как это происходит в момент создания.

Мне нужно, чтобы Observer был уведомлен и действие должно выполняться каждый раз, когда ArrayList получает новый элемент.

Ответы

Ответ 1

Там вы идете. Благодаря Dávid Karnok в RxJava Google Group

import java.util.ArrayList;
import java.util.List;

import rx.Observable;
import rx.subjects.PublishSubject;

public class ObservableListExample {

    public static class ObservableList<T> {

        protected final List<T> list;
        protected final PublishSubject<T> onAdd;

        public ObservableList() {
            this.list = new ArrayList<T>();
            this.onAdd = PublishSubject.create();
        }
        public void add(T value) {
            list.add(value);
            onAdd.onNext(value);
        }
        public Observable<T> getObservable() {
            return onAdd;
        }
    }

    public static void main(String[] args) throws Exception {
        ObservableList<Integer> olist = new ObservableList<>();

        olist.getObservable().subscribe(System.out::println);

        olist.add(1);
        Thread.sleep(1000);
        olist.add(2);
        Thread.sleep(1000);
        olist.add(3);
    }
}

Ответ 2

Вы можете объединить два наблюдаемых в один. Один из них может быть исходным списком элементов, а второй может быть предметом:

import rx.Observable;
import rx.subjects.ReplaySubject;
import java.util.ArrayList;
import java.util.List;

public class ExampleObservableList {
    public static void main(String[] args) {
        List<Integer> initialNumbers = new ArrayList<Integer>();
        initialNumbers.add(1);
        initialNumbers.add(2);

        Observable<Integer> observableInitial = Observable.from(initialNumbers);
        ReplaySubject<Integer> subject = ReplaySubject.create();

        Observable<Integer> source = Observable.merge(observableInitial, subject);

        source.subscribe(System.out::println);

        for (int i = 0; i < 100; ++i) {
            subject.onNext(i);
        }
    }

}

Если у вас нет исходных элементов, вы можете использовать только ReplaySubject (или другие Subject → см. http://reactivex.io/documentation/subject.html)

public static void main(String[] args) {
    ReplaySubject<Integer> source = ReplaySubject.create();
    source.subscribe(System.out::println);

    for (int i = 0; i < 100; ++i) {
        source.onNext(i);
    }
}

Ответ 3

Я бы рассмотрел этот подход, основанный на BehaviourSubject. Это отличается от решения juanpavergara тем, что onNext() будет немедленно отправлен Наблюдателю при подписке на Observable.

public class ObservableList<T> {

    protected final List<T> list;
    protected final BehaviorSubject<List<T>> behaviorSubject;

    public ObservableList(List<T> list) {
        this.list = list;
        this.behaviorSubject = BehaviorSubject.create(list);
    }

    public Observable<List<T>> getObservable() {
        return behaviorSubject;
    }

    public void add(T element) {
        list.add(element);
        behaviorSubject.onNext(list);
    }
}


private void main() {
    final List<Integer> list = new ArrayList<>();
    list.add(0);
    list.add(1);

    final ObservableList<Integer> olist = new ObservableList<>(list);

    olist.getObservable().subscribe(System.out::println);

    olist.add(2);
    olist.add(3);
}

Это решение может быть полезно при реализации MVP, когда вы хотите наблюдать один ресурс (т.е. список объектов), возвращаемый одним компонентом в системе (то есть: один репозиторий или DataSource), и вы хотите, чтобы Observer (т.е.: Presenter или Interactor) для уведомления, когда элемент добавляется в список в другой части системы.