RX: Пропустить Zipped Observables параллельно?
Итак, я играю с RX (действительно круто), и я конвертировал свой api, который обращается к базе данных sqlite в Android, чтобы вернуть наблюдаемые.
Поэтому, естественно, одна из проблем, которые я начал пытаться решить, заключается в следующем: "Что, если я хочу сделать 3 вызова API, получить результаты, а затем выполнить некоторую обработку, как только они закончатся?"
Мне потребовался час или 2, но в итоге я нашел Функцию Zip, и это помогает мне удобно:
Observable<Integer> one = getNumberedObservable(1);
Observable<Integer> two = getNumberedObservable(2);
Observable<Integer> three = getNumberedObservable(3);
Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
@Override
public Integer call(Integer arg0, Integer arg1, Integer arg2) {
System.out.println("Zip0: " + arg0);
System.out.println("Zip1: " + arg1);
System.out.println("Zip2: " + arg2);
return arg0 + arg1 + arg2;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer arg0) {
System.out.println("Zipped Result: " + arg0);
}
});
public static Observable<Integer> getNumberedObservable(final int value) {
return Observable.create(new OnSubscribeFunc<Integer>() {
@Override
public Subscription onSubscribe(Observer<? super Integer> observer) {
observer.onNext(value);
observer.onCompleted();
return Subscriptions.empty();
}
});
}
Отлично! Так что круто.
Итак, когда я застегиваю 3 наблюдаемых, они запускаются серийно. Что делать, если я хочу, чтобы все они работали параллельно одновременно, поэтому я получаю результаты быстрее? Я играл с несколькими вещами и даже пытался прочитать некоторые из оригинальных материалов RX, которые люди написали в С#. Я уверен, что есть простой ответ. Может кто-то указать мне верное направление? Каков правильный способ сделать это?
Ответы
Ответ 1
zip
выполняет запуск наблюдаемых параллельно, но также подписывается на них последовательно. Поскольку ваш getNumberedObservable
завершается в методе подписки, он создает впечатление, что он работает серийно, но на самом деле такого ограничения нет.
Вы можете либо попробовать с некоторыми длинными работающими Observables, которые переживают свою логику подписки, например timer
, либо использовать метод subscribeOn
для асинхронного подписания каждого потока, переданного в zip
.
Ответ 2
В RxJava используйте toAsync, чтобы превратить регулярную функцию в то, что будет работать в потоке и вернуть результат в наблюдаемый.
Я не знаю синтаксиса Java, что хорошо, но он будет выглядеть примерно так:
public static Integer getNumber(final int value) { return value; }
public static Observable<Integer> getNumberedObservable(final int value) {
return rx.util.functions.toAsync(new Func<Integer,Integer>() {
@Override
public Integer call(Integer value) { return getNumber(value); }
});
};
Это будет работать, если getNumber
действительно обращается к базе данных. Когда вы вызываете getNumberedObservable
, он возвращает наблюдаемый, который будет запускать getNumber
в отдельном потоке, когда вы подписываетесь на него.
Ответ 3
Я пытался сделать то же самое, параллельно используя несколько потоков, используя zip. Я закончил открытие нового вопроса и получил ответ. В принципе, вы должны подписывать каждый наблюдаемый на новый поток, поэтому , если вы хотите запускать три наблюдаемых параллельно с помощью zip, вы должны подписаться на 3 отдельных потока.