RX: Пропустить Zipped Observables параллельно?

Итак, я играю с RX (действительно круто), и я конвертировал свой api, который обращается к базе данных sqlite в Android, чтобы вернуть наблюдаемые.

Поэтому, естественно, одна из проблем, которые я начал пытаться решить, заключается в следующем: "Что, если я хочу сделать 3 вызова API, получить результаты, а затем выполнить некоторую обработку, как только они закончатся?"

Мне потребовался час или 2, но в итоге я нашел Функцию Zip, и это помогает мне удобно:

    Observable<Integer> one = getNumberedObservable(1);
    Observable<Integer> two = getNumberedObservable(2);
    Observable<Integer> three = getNumberedObservable(3);

    Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer arg0, Integer arg1, Integer arg2) {
            System.out.println("Zip0: " + arg0);
            System.out.println("Zip1: " + arg1);
            System.out.println("Zip2: " + arg2);
            return arg0 + arg1 + arg2;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer arg0) {
            System.out.println("Zipped Result: " + arg0);
        }
    });

public static Observable<Integer> getNumberedObservable(final int value) {
    return Observable.create(new OnSubscribeFunc<Integer>() {
        @Override
        public Subscription onSubscribe(Observer<? super Integer> observer) {
            observer.onNext(value);
            observer.onCompleted();
            return Subscriptions.empty();
        }
    });
}

Отлично! Так что круто.

Итак, когда я застегиваю 3 наблюдаемых, они запускаются серийно. Что делать, если я хочу, чтобы все они работали параллельно одновременно, поэтому я получаю результаты быстрее? Я играл с несколькими вещами и даже пытался прочитать некоторые из оригинальных материалов RX, которые люди написали в С#. Я уверен, что есть простой ответ. Может кто-то указать мне верное направление? Каков правильный способ сделать это?

Ответы

Ответ 1

zip выполняет запуск наблюдаемых параллельно, но также подписывается на них последовательно. Поскольку ваш getNumberedObservable завершается в методе подписки, он создает впечатление, что он работает серийно, но на самом деле такого ограничения нет.

Вы можете либо попробовать с некоторыми длинными работающими Observables, которые переживают свою логику подписки, например timer, либо использовать метод subscribeOn для асинхронного подписания каждого потока, переданного в zip.

Ответ 2

В RxJava используйте toAsync, чтобы превратить регулярную функцию в то, что будет работать в потоке и вернуть результат в наблюдаемый.

Я не знаю синтаксиса Java, что хорошо, но он будет выглядеть примерно так:

public static Integer getNumber(final int value) { return value; }
public static Observable<Integer> getNumberedObservable(final int value) {
    return rx.util.functions.toAsync(new Func<Integer,Integer>() {
        @Override
        public Integer call(Integer value) { return getNumber(value); }
    });
};

Это будет работать, если getNumber действительно обращается к базе данных. Когда вы вызываете getNumberedObservable, он возвращает наблюдаемый, который будет запускать getNumber в отдельном потоке, когда вы подписываетесь на него.

Ответ 3

Я пытался сделать то же самое, параллельно используя несколько потоков, используя zip. Я закончил открытие нового вопроса и получил ответ. В принципе, вы должны подписывать каждый наблюдаемый на новый поток, поэтому , если вы хотите запускать три наблюдаемых параллельно с помощью zip, вы должны подписаться на 3 отдельных потока.