RxJava: как составить несколько Observables с зависимостями и собрать все результаты в конце?
Я изучаю RxJava и, как мой первый эксперимент, пытается переписать код в первом методе run()
в этом коде (цитируется в блог Netflix как проблема RxJava может помочь решить), чтобы улучшить его асинхронность с использованием RxJava, т.е. он не ждет результата первого Будущего (f1.get()
), прежде чем продолжить к остальной части кода.
f3
зависит от f1
. Я вижу, как с этим справиться, flatMap
, похоже, делает трюк:
Observable<String> f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA()))
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return Observable.from(executor.submit(new CallToRemoteServiceC(s)));
}
});
Далее f4
и f5
зависят от f2
. У меня есть это:
final Observable<Integer> f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB()))
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer i) {
Observable<Integer> f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i)));
Observable<Integer> f5Observable = Observable.from(executor.submit(new CallToRemoteServiceE(i)));
return Observable.merge(f4Observable, f5Observable);
}
});
Что начинает становиться странным (merge
их, вероятно, не то, что я хочу...), но позволяет мне делать это в конце, не совсем то, что я хочу:
f3Observable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("Observed from f3: " + s);
f4And5Observable.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println("Observed from f4 and f5: " + i);
}
});
}
});
Это дает мне:
Observed from f3: responseB_responseA
Observed from f4 and f5: 140
Observed from f4 and f5: 5100
который является всеми числами, но, к сожалению, я получаю результаты в отдельных вызовах, поэтому я не могу полностью заменить окончательный println в исходном коде:
System.out.println(f3.get() + " => " + (f4.get() * f5.get()));
Я не понимаю, как получить доступ к обеим возвращаемым значениям в одной строке. Я думаю, что там, вероятно, есть некоторые функциональные программы, которые мне не хватает. Как я могу это сделать? Спасибо.
Ответы
Ответ 1
Похоже, все, что вам действительно нужно, - это немного больше поддержки и перспективы использования RX. Я бы посоветовал вам больше ознакомиться с документацией, а также с мраморными диаграммами (я знаю, что они не всегда полезны). Я также предлагаю изучить функцию lift()
и операторы.
- Вся точка наблюдаемого заключается в объединении потока данных и манипуляции данными с одним объектом.
- Точка вызовов
map
, flatMap
и filter
должна обрабатывать данные в потоке данных.
- Точка слияния состоит в объединении потоков данных
-
Точка операторов должна позволять вам нарушать постоянный поток наблюдаемых и определять собственные операции над потоком данных. Например, я закодировал оператора скользящей средней. Это суммирует n
double
в наблюдении удвоений, чтобы вернуть поток скользящих средних. Код буквально выглядел следующим образом:
Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))
Вы почувствуете облегчение, что многие методы фильтрации, которые вы считаете само собой разумеющимися, имеют lift()
под капотом.
Сказанное; все, что требуется для объединения нескольких зависимостей:
- изменение всех входящих данных на стандартный тип данных с помощью
map
или flatMap
- объединение стандартных типов данных в поток
- с использованием настраиваемых операторов, если одному объекту нужно ждать по другому, или если вам нужно заказывать данные в потоке. Внимание: этот подход замедлит поток вниз.
- используя список или подписку на сбор всех этих данных.
Ответ 2
Изменить: кто-то преобразовал следующий текст, который я добавил в качестве редактирования в вопросе, в ответ, который я ценю, и понимаю, может быть, правильно, что нужно делать, однако я не считайте это ответом, потому что это явно не правильный способ сделать это. Я бы никогда не использовал этот код и не советую кому-либо его скопировать. Другие/лучшие решения и комментарии приветствуются!
Я смог решить эту проблему следующим образом. Я не знал, что вы могли бы наблюдаться не один раз, я полагал, что результаты могут быть использованы только один раз. Поэтому я просто flatMap
f2Observable дважды (извините, я переименовал некоторые вещи в коде с момента моего первоначального сообщения), затем zip
на всех наблюдаемых, а затем подписаться на это. То, что Map
в zip
для агрегирования значений нежелательно из-за типа жонглирования. Другие/лучшие решения и комментарии приветствуются! полный код доступен для просмотра. Спасибо.
Future<Integer> f2 = executor.submit(new CallToRemoteServiceB());
Observable<Integer> f2Observable = Observable.from(f2);
Observable<Integer> f4Observable = f2Observable
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
System.out.println("Observed from f2: " + integer);
Future<Integer> f4 = executor.submit(new CallToRemoteServiceD(integer));
return Observable.from(f4);
}
});
Observable<Integer> f5Observable = f2Observable
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
System.out.println("Observed from f2: " + integer);
Future<Integer> f5 = executor.submit(new CallToRemoteServiceE(integer));
return Observable.from(f5);
}
});
Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() {
@Override
public Map<String, String> call(String s, Integer integer, Integer integer2) {
Map<String, String> map = new HashMap<String, String>();
map.put("f3", s);
map.put("f4", String.valueOf(integer));
map.put("f5", String.valueOf(integer2));
return map;
}
}).subscribe(new Action1<Map<String, String>>() {
@Override
public void call(Map<String, String> map) {
System.out.println(map.get("f3") + " => " + (Integer.valueOf(map.get("f4")) * Integer.valueOf(map.get("f5"))));
}
});
И это дает мне желаемый результат:
responseB_responseA => 714000
Ответ 3
Я думаю, что вы ищете коммутатор. Мы столкнулись с аналогичной проблемой, когда у нас есть сеансовая служба, которая обрабатывает новый сеанс из api, и нам нужен этот сеанс, прежде чем мы сможем получить больше данных. Мы можем добавить к наблюдаемому сеансу, который возвращает sessionToken для использования в нашем вызове данных.
getSession возвращает наблюдаемый;
public getSession(): Observable<any>{
if (this.sessionToken)
return Observable.of(this.sessionToken);
else if(this.sessionObservable)
return this.sessionObservable;
else {
// simulate http call
this.sessionObservable = Observable.of(this.sessonTokenResponse)
.map(res => {
this.sessionObservable = null;
return res.headers["X-Session-Token"];
})
.delay(500)
.share();
return this.sessionObservable;
}
}
и getData принимает это наблюдаемое и добавляет к нему.
public getData() {
if (this.dataObservable)
return this.dataObservable;
else {
this.dataObservable = this.sessionService.getSession()
.switchMap((sessionToken:string, index:number) =>{
//simulate data http call that needed sessionToken
return Observable.of(this.dataResponse)
.map(res => {
this.dataObservable = null;
return res.body;
})
.delay(1200)
})
.map ( data => {
return data;
})
.catch(err => {
console.log("err in data service", err);
// return err;
})
.share();
return this.dataObservable;
}
}
Вам понадобится плоская карта для объединения не зависимых наблюдаемых.
Plunkr: http://plnkr.co/edit/hiA1jP?p=info
Где я получил идею использовать карту переключателей: http://blog.thoughtram.io/angular/2016/01/06/taking-advantage-of-observables-in-angular2.html