Rxjava Android, как использовать Zip-оператор
У меня возникли проблемы с пониманием оператора zip в RxJava для моего проекта Android.
проблема
Мне нужно отправить сетевой запрос для загрузки видео
Затем мне нужно отправить сетевой запрос, чтобы загрузить изображение, чтобы пойти с ним
наконец, мне нужно добавить описание и использовать ответы от предыдущих двух запросов, чтобы загрузить URL-адреса местоположения видео и изображения вместе с описанием на мой сервер.
Я предположил, что zip-оператор был бы идеальным для этой задачи, поскольку я понял, что мы могли бы взять ответ двух наблюдаемых (запросы видео и изображений) и использовать их для моей последней задачи.
Но я не могу понять, как это происходит.
Я ищу кого-то, чтобы ответить, как это можно сделать концептуально с помощью некоторого кода psuedo.
Спасибо вам
Ответы
Ответ 1
Оператор Zip строго пар испускает элементы из наблюдаемых. Он ожидает, что оба (или более) элемента будут отправлены, а затем слейт их. Так что да, это будет подходящим для ваших нужд.
Я использовал бы Func2
, чтобы связать результат с первыми двумя наблюдаемыми.
Обратите внимание, что этот подход будет проще, если вы используете Retrofit, так как его интерфейс api может вернуть наблюдаемый. В противном случае вам нужно будет создать свой собственный наблюдаемый.
// assuming each observable returns response in the form of String
Observable<String> movOb = Observable.create(...);
// if you use Retrofit
Observable<String> picOb = RetrofitApiManager.getService().uploadPic(...),
Observable.zip(movOb, picOb,
new Func2<String, String, MyResult>() {
@Override
public MyResult call(String movieUploadResponse,
String picUploadResponse) {
// analyze both responses, upload them to another server
// and return this method with a MyResult type
return myResult;
}
}
)
// continue chaining this observable with subscriber
// or use it for something else
Ответ 2
Шаг за шагом: → Позволяет сначала определить наш объект Retrofit для доступа к API Githubs, а затем установить два наблюдаемых для двух указанных выше сетевых запросов:
Retrofit repo = new Retrofit.Builder()
.baseUrl("https://api.github.com")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
Observable<JsonObject> userObservable = repo
.create(GitHubUser.class)
.getUser(loginName)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
Observable<JsonArray> eventsObservable = repo
.create(GitHubEvents.class)
.listEvents(loginName)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
Интерфейсы Retrofit достаточно просты:
public interface GitHubUser {
@GET("users/{user}")
Observable<JsonObject> getUser(@Path("user") String user);
}
public interface GitHubEvents {
@GET("users/{user}/events")
Observable<JsonArray> listEvents(@Path("user") String user);
}
В последнее время мы используем метод RxJavas zip для объединения наших двух наблюдаемых и дождаемся их завершения до создания нового Observable.
Observable<UserAndEvents> combined = Observable.zip(userObservable, eventsObservable, new Func2<JsonObject, JsonArray, UserAndEvents>() {
@Override
public UserAndEvents call(JsonObject jsonObject, JsonArray jsonElements) {
return new UserAndEvents(jsonObject, jsonElements);
}
});
Что такое UserAndEvents? Простое POJO для объединения двух объектов:
public class UserAndEvents {
public UserAndEvents(JsonObject user, JsonArray events) {
this.events = events;
this.user = user;
}
public JsonArray events;
public JsonObject user;
}
Наконец, позвоните по методу подписки на наш новый объединенный Observable:
combined.subscribe(new Subscriber<UserAndEvents>() {
...
@Override
public void onNext(UserAndEvents o) {
// You can access the results of the
// two observabes via the POJO now
}
});
Ответ 3
Маленький example:
Observable<String> stringObservable1 = Observable.just("Hello", "World");
Observable<String> stringObservable2 = Observable.just("Bye", "Friends");
Observable.zip(stringObservable1, stringObservable2, new BiFunction<String, String, String>() {
@Override
public String apply(@NonNull String s, @NonNull String s2) throws Exception {
return s + " - " + s2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
Это напечатает:
Hello - Bye
World - Friends
Ответ 4
Здесь у меня есть пример, который я использовал с помощью Zip в асинхронном режиме, на случай, если вам любопытно
/**
* Since every observable into the zip is created to subscribeOn a diferent thread, it´s means all of them will run in parallel.
* By default Rx is not async, only if you explicitly use subscribeOn.
*/
@Test
public void testAsyncZip() {
scheduler = Schedulers.newThread();
scheduler1 = Schedulers.newThread();
scheduler2 = Schedulers.newThread();
long start = System.currentTimeMillis();
Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
.concat(s3))
.subscribe(result -> showResult("Async in:", start, result));
}
/**
* In this example the the three observables will be emitted sequentially and the three items will be passed to the pipeline
*/
@Test
public void testZip() {
long start = System.currentTimeMillis();
Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2)
.concat(s3))
.subscribe(result -> showResult("Sync in:", start, result));
}
public void showResult(String transactionType, long start, String result) {
System.out.println(result + " " +
transactionType + String.valueOf(System.currentTimeMillis() - start));
}
public Observable<String> obString() {
return Observable.just("")
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "Hello");
}
public Observable<String> obString1() {
return Observable.just("")
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> " World");
}
public Observable<String> obString2() {
return Observable.just("")
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "!");
}
public Observable<String> obAsyncString() {
return Observable.just("")
.observeOn(scheduler)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "Hello");
}
public Observable<String> obAsyncString1() {
return Observable.just("")
.observeOn(scheduler1)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> " World");
}
public Observable<String> obAsyncString2() {
return Observable.just("")
.observeOn(scheduler2)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "!");
}
Здесь вы можете увидеть больше примеров https://github.com/politrons/reactive
Ответ 5
Я искал простой ответ о том, как использовать Zip-оператор, и что делать с Observables, которые я создаю, чтобы передать их ему, мне было интересно, следует ли мне звонить subscribe() для каждого наблюдаемого или нет, не из этих ответов было просто найти, я должен был понять это сам, так что вот простой пример использования Zip-оператора на 2 Observables:
@Test
public void zipOperator() throws Exception {
List<Integer> indexes = Arrays.asList(0, 1, 2, 3, 4);
List<String> letters = Arrays.asList("a", "b", "c", "d", "e");
Observable<Integer> indexesObservable = Observable.fromIterable(indexes);
Observable<String> lettersObservable = Observable.fromIterable(letters);
Observable.zip(indexesObservable, lettersObservable, mergeEmittedItems())
.subscribe(printMergedItems());
}
@NonNull
private BiFunction<Integer, String, String> mergeEmittedItems() {
return new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer index, String letter) throws Exception {
return "[" + index + "] " + letter;
}
};
}
@NonNull
private Consumer<String> printMergedItems() {
return new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
};
}
печатный результат:
[0] a
[1] b
[2] c
[3] d
[4] e
окончательные ответы на вопросы, что в моей голове были
Наблюдаемые, передаваемые методу zip(), просто должны быть созданы только, им не нужно иметь никаких подписчиков, но их создание достаточно... если вы хотите, чтобы какие-либо наблюдаемые выполнялись в планировщике, вы может указать это для этого Observable... Я также попробовал оператор zip() в Observables, где они должны ждать результата, а Расходная часть zip() была вызвана только тогда, когда оба результата, где они готовы (что является ожидаемым поведением)
Ответ 6
Оператор zip
позволяет вам составить результат из результатов двух разных наблюдаемых.
Вам нужно будет дать лямбду, которая создаст результат из данных, излучаемых каждым наблюдаемым.
Observable<MovieResponse> movies = ...
Observable<PictureResponse> picture = ...
Observable<Response> response = movies.zipWith(picture, (movie, pic) -> {
return new Response("description", movie.getName(), pic.getUrl());
});