Как обрабатывать пейджинг с помощью RxJava?
Я изучаю преобразование моего приложения Android для использования Rxjava для сетевых запросов. В настоящее время я пользуюсь веб-сервисом, похожим на:
getUsersByKeyword(String query, int limit, int offset)
Как я понимаю, Observables являются "push", а не "pull" интерфейсом. Итак, вот как я понимаю, что нужно сделать:
- приложения регистрируются с сервисом, получая наблюдение за запросом
- результаты будут перенесены в приложение
- приложение имеет дело с результатами
- когда приложение хочет получить больше результатов...?
Это то, что вещи ломают для меня. Раньше я просто спрашивал webservice точно, что хочу, снова сделайте запрос со смещением. Но в этом случае это будет связано с созданием другого Observable и подпиской на него, вроде того, чтобы победить точку.
Как мне обрабатывать пейджинг в моем приложении? (Это приложение для Android, но я не думаю, что это важно).
Ответы
Ответ 1
Это был хард-рок!)
Итак, у нас есть запрос в сеть:
getUsersByKeyword(String query, int limit, int offset)
и этот запрос возвращает, например,
List< Result >
Если мы используем RetroFit для работы в сети, этот запрос будет выглядеть:
Observable< List< Result >> getUsersByKeyword(String query, int limit, int offset)
В результате мы хотим получить все Result
с сервера.
Так что это будет выглядеть как
int page = 50;
int limit = page;
Observable
.range(0, Integer.MAX_VALUE - 1)
.concatMap(new Func1<Integer, Observable<List<Result>>>() {
@Override
public Observable<List<Result>> call(Integer integer) {
return getUsersByKeyword(query, integer * page, limit);
}
})
.takeWhile(new Func1<List<Result>, Boolean>() {
@Override
public Boolean call(List<Result> results) {
return !results.isEmpty();
}
})
.scan(new Func2< List<Result>, List<Result>, List<Result>>() {
@Override
public List<Result> call(List<Result> results, List< Result> results2) {
List<Result> list = new ArrayList<>();
list.addAll(results);
list.addAll(results2);
return list;
}
})
.last()
.subscribe(new Subscriber<List<Result>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(List<Results> results) {
}
});
Код был ПРОВЕРЕН!
Ответ 2
Итак, если это односторонний пейджинг, вот образец, который вы можете попробовать. Этот код не был запущен или не скомпилирован, но я попытался выполнить чрезмерную аннотацию, чтобы объяснить, что происходит.
private static final int LIMIT = 50;
// Given: Returns a stream of dummy event objects telling us when
// to grab the next page. This could be from a click or wherever.
Observable<NextPageEvent> getNextPageEvents();
// Given:
// The search query keywords. Each emission here means a new top-level
// request;
Observable<String> queries;
queries.switchMap((query) -> getNextPageEvents()
// Ignore 'next page' pokes when unable to take them.
.onBackPressureDrop()
// Seed with the first page.
.startWith(new NextPageEvent())
// Increment the page number on each request.
.scan(0, (page, event) -> page + 1)
// Request the page from the server.
.concatMap((page) -> getUsersByKeyword(query, LIMIT, LIMIT * page)
// Unroll Observable<List<User> into Observable<User>
.concatMap((userList) -> Observable.from(userList))
.retryWhen(/** Insert your favorite retry logic here. */))
// Only process new page requests sequentially.
.scheduleOn(Schedulers.trampoline())
// Trampoline schedules on the 'current thread', so we make sure that's
// a background IO thread.
.scheduleOn(Schedulers.io());
Это означает, что сигнал "следующей страницы" запускает загрузку данных следующей страницы каждый раз, а также не перескакивает страницы, если он сталкивается с ошибкой при загрузке. Он также полностью перезапускается на верхнем уровне, если получает новый поисковый запрос. Если у меня (или у кого-то еще?) Есть время, я бы хотел проверить мои предположения о батуте и противодавлении и убедиться, что он блокирует любую попытку преждевременного получения следующей страницы во время загрузки.
Ответ 3
Я сделал это, и на самом деле это не так сложно.
Подход заключается в моделировании каждого первого запроса (смещение 0) в firstRequestsObservable. Чтобы упростить задачу, вы можете сделать это как PublishSubject, где вы вызываете onNext()
для подачи следующего запроса, но есть более разумные способы, не связанные с объектом (например, если запросы выполняются при нажатии кнопки, тогда requestObservable является clickObservable, отображаемым через некоторые операторы).
Как только у вас есть firstRequestsObservable
, вы можете сделать responseObservable
с помощью flatMapping из firstRequestsObservable
и т.д., чтобы выполнить служебный вызов.
Теперь вот трюк: сделайте еще один наблюдаемый вызванный subsequentRequestsObservable
, который отображается из responseObservable
, увеличивая смещение (для этого полезно включить в данные ответа смещение исходного запроса). Как только вы представите это наблюдаемое, теперь вам нужно изменить определение responseObservable
, чтобы оно зависело также от subsequentRequestsObservable
. Затем вы получаете круговую зависимость следующим образом:
firstRequestsObservable → responseObservable → followingRequestsObservable → responseObservable → followingRequestsObservable → ...
Чтобы разорвать этот цикл, вы, вероятно, захотите включить оператор filter
в определение subsequentRequestsObservable
, отфильтровывая те случаи, когда смещение будет передавать "общий" предел. Круговая зависимость также означает, что вам нужно иметь один из тех, кто является Субъектом, иначе было бы невозможно объявить наблюдаемые. Я рекомендую responseObservable
быть тем субъектом.
Итак, в целом вы сначала инициализируете responseObservable как предмет, затем объявляете firstRequestsObservable, а затем объявляете nextRequestsObservable в результате передачи responseObservable через некоторые операторы. responseObservable может быть "подан" с помощью onNext
.
Ответ 4
Чтобы быть ясным, я полагаю, что ваши вопросы касаются более того, как применять RxJava в приложении для Android, а не на заднем конце (хотя это можно применить тоже, но не так типично, как передняя часть).
И я не очень уверен, что этот пример является типичным примером использования модели реактивного функционального программирования (RFP), за исключением чистых конструктов кода.
Каждый поток ниже является наблюдаемым. Лично я хотел бы думать, что это поток, поскольку легко рассуждать о событиях.
Поток разбиения на страницы может быть представлен 6 потоками:
firstPageStream -f-------------------> // this is actually to produce very first event to obtain the first page result. The event can come from a touch in one of the screen that navigates to this list screen.
nextPageStream -----n-------n-------> // this is the source of events coming from Next button 'touch' actions
prevPageStream ---------p-------p---> // this is the source of events coming from Previous button 'touch' actions
requestStream -r---r---r---r---r---> // this is to consume the signal from 3 streams above and spawn events (r) which create a query and pagination details i.e.: offset and limit
responseStream -R---R---R---R---R---> // this is to take each r and invoke your web service getUsersByKeyword() then spawn the response (R)
Вышеуказанные потоки могут быть представлены в псевдокоде (стиль JS) ниже (его было бы довольно легко перевести на RxJava или на другие языки)
firstPageStream = Observable.just(0); // offset=0
nextPageStream = Observable.fromEvent(nextButton, 'touch')
.map(function() {
offset = offset + limit;
return offset;
});
prevPageStream = Observable.fromEvent(prevButton, 'touch')
.map(function() {
offset = offset - limit; // TODO some check here
return offset;
});
requestStream = Observable.merge(firstPageStream, nextPageStream, prevPageStream)
.map(function(offsetValue) {
return {offset : offsetValue, limit: limit};
});
responseStream = requestStream.flatMap(function(pagination) {
return webservice.getUsersByKeyword(query, pagination.offset, pagination.limit); // assume this is async response
});
responseStream.subscribe(function(result) {
// use result to render the display
});
PS1: Я не тестировал вышеуказанный код. Я изучаю RFP, поэтому я просто пытаюсь думать реактивно, записывая. Добро пожаловать в любое предложение.
PS2: На меня сильно влияет https://gist.github.com/staltz/868e7e9bc2a7b8c1f754 для объяснения реактивных потоков.