Как обрабатывать пейджинг с помощью RxJava?

Я изучаю преобразование моего приложения Android для использования Rxjava для сетевых запросов. В настоящее время я пользуюсь веб-сервисом, похожим на:

getUsersByKeyword(String query, int limit, int offset)

Как я понимаю, Observables являются "push", а не "pull" интерфейсом. Итак, вот как я понимаю, что нужно сделать:

  • приложения регистрируются с сервисом, получая наблюдение за запросом
  • результаты будут перенесены в приложение
  • приложение имеет дело с результатами
  • когда приложение хочет получить больше результатов...?

Это то, что вещи ломают для меня. Раньше я просто спрашивал webservice точно, что хочу, снова сделайте запрос со смещением. Но в этом случае это будет связано с созданием другого Observable и подпиской на него, вроде того, чтобы победить точку.

Как мне обрабатывать пейджинг в моем приложении? (Это приложение для Android, но я не думаю, что это важно).

Ответы

Ответ 1

Это был хард-рок!) Итак, у нас есть запрос в сеть:
getUsersByKeyword(String query, int limit, int offset)
и этот запрос возвращает, например,
List< Result >
Если мы используем RetroFit для работы в сети, этот запрос будет выглядеть:
Observable< List< Result >> getUsersByKeyword(String query, int limit, int offset)
В результате мы хотим получить все Result с сервера.
Так что это будет выглядеть как

int page = 50;
int limit = page;
Observable
                .range(0, Integer.MAX_VALUE - 1)
                .concatMap(new Func1<Integer, Observable<List<Result>>>() {
                    @Override
                    public Observable<List<Result>> call(Integer integer) {
                        return getUsersByKeyword(query, integer * page, limit);
                    }
                })
                .takeWhile(new Func1<List<Result>, Boolean>() {
                    @Override
                    public Boolean call(List<Result> results) {
                        return !results.isEmpty();
                    }
                })
                .scan(new Func2< List<Result>, List<Result>, List<Result>>() {
                    @Override
                    public List<Result> call(List<Result> results, List< Result> results2) {
                        List<Result> list = new ArrayList<>();
                        list.addAll(results);
                        list.addAll(results2);
                        return list;
                    }
                })
                .last()
                .subscribe(new Subscriber<List<Result>>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(List<Results> results) {
                    }
                });

Код был ПРОВЕРЕН!

Ответ 2

Итак, если это односторонний пейджинг, вот образец, который вы можете попробовать. Этот код не был запущен или не скомпилирован, но я попытался выполнить чрезмерную аннотацию, чтобы объяснить, что происходит.

private static final int LIMIT = 50;

// Given: Returns a stream of dummy event objects telling us when
// to grab the next page. This could be from a click or wherever.
Observable<NextPageEvent> getNextPageEvents();  

// Given:
// The search query keywords. Each emission here means a new top-level
// request;
Observable<String> queries;

queries.switchMap((query) -> getNextPageEvents()
        // Ignore 'next page' pokes when unable to take them.
        .onBackPressureDrop()
        // Seed with the first page.
        .startWith(new NextPageEvent())
        // Increment the page number on each request.
        .scan(0, (page, event) -> page + 1) 
        // Request the page from the server.
        .concatMap((page) -> getUsersByKeyword(query, LIMIT, LIMIT * page)
                // Unroll Observable<List<User> into Observable<User>
                .concatMap((userList) -> Observable.from(userList))
                .retryWhen(/** Insert your favorite retry logic here. */))
        // Only process new page requests sequentially.
        .scheduleOn(Schedulers.trampoline())
        // Trampoline schedules on the 'current thread', so we make sure that's
        // a background IO thread.
        .scheduleOn(Schedulers.io());

Это означает, что сигнал "следующей страницы" запускает загрузку данных следующей страницы каждый раз, а также не перескакивает страницы, если он сталкивается с ошибкой при загрузке. Он также полностью перезапускается на верхнем уровне, если получает новый поисковый запрос. Если у меня (или у кого-то еще?) Есть время, я бы хотел проверить мои предположения о батуте и противодавлении и убедиться, что он блокирует любую попытку преждевременного получения следующей страницы во время загрузки.

Ответ 3

Я сделал это, и на самом деле это не так сложно.

Подход заключается в моделировании каждого первого запроса (смещение 0) в firstRequestsObservable. Чтобы упростить задачу, вы можете сделать это как PublishSubject, где вы вызываете onNext() для подачи следующего запроса, но есть более разумные способы, не связанные с объектом (например, если запросы выполняются при нажатии кнопки, тогда requestObservable является clickObservable, отображаемым через некоторые операторы).

Как только у вас есть firstRequestsObservable, вы можете сделать responseObservable с помощью flatMapping из firstRequestsObservable и т.д., чтобы выполнить служебный вызов.

Теперь вот трюк: сделайте еще один наблюдаемый вызванный subsequentRequestsObservable, который отображается из responseObservable, увеличивая смещение (для этого полезно включить в данные ответа смещение исходного запроса). Как только вы представите это наблюдаемое, теперь вам нужно изменить определение responseObservable, чтобы оно зависело также от subsequentRequestsObservable. Затем вы получаете круговую зависимость следующим образом:

firstRequestsObservable → responseObservable → followingRequestsObservable → responseObservable → followingRequestsObservable → ...

Чтобы разорвать этот цикл, вы, вероятно, захотите включить оператор filter в определение subsequentRequestsObservable, отфильтровывая те случаи, когда смещение будет передавать "общий" предел. Круговая зависимость также означает, что вам нужно иметь один из тех, кто является Субъектом, иначе было бы невозможно объявить наблюдаемые. Я рекомендую responseObservable быть тем субъектом.

Итак, в целом вы сначала инициализируете responseObservable как предмет, затем объявляете firstRequestsObservable, а затем объявляете nextRequestsObservable в результате передачи responseObservable через некоторые операторы. responseObservable может быть "подан" с помощью onNext.

Ответ 4

Чтобы быть ясным, я полагаю, что ваши вопросы касаются более того, как применять RxJava в приложении для Android, а не на заднем конце (хотя это можно применить тоже, но не так типично, как передняя часть). И я не очень уверен, что этот пример является типичным примером использования модели реактивного функционального программирования (RFP), за исключением чистых конструктов кода.

Каждый поток ниже является наблюдаемым. Лично я хотел бы думать, что это поток, поскольку легко рассуждать о событиях. Поток разбиения на страницы может быть представлен 6 потоками:

firstPageStream -f-------------------> // this is actually to produce very first event to obtain the first page result. The event can come from a touch in one of the screen that navigates to this list screen.
nextPageStream  -----n-------n-------> // this is the source of events coming from Next button 'touch' actions
prevPageStream  ---------p-------p---> // this is the source of events coming from Previous button 'touch' actions
requestStream   -r---r---r---r---r---> // this is to consume the signal from 3 streams above and spawn events (r) which create a query and pagination details i.e.: offset and limit
responseStream  -R---R---R---R---R---> // this is to take each r and invoke your web service getUsersByKeyword() then spawn the response (R)

Вышеуказанные потоки могут быть представлены в псевдокоде (стиль JS) ниже (его было бы довольно легко перевести на RxJava или на другие языки)

firstPageStream = Observable.just(0); // offset=0
nextPageStream = Observable.fromEvent(nextButton, 'touch')
                           .map(function() {
                              offset = offset + limit;
                              return offset;
                            });
prevPageStream = Observable.fromEvent(prevButton, 'touch')
                           .map(function() {
                              offset = offset - limit; // TODO some check here
                              return offset;
                            });
requestStream = Observable.merge(firstPageStream, nextPageStream, prevPageStream)
                            .map(function(offsetValue) {
                               return {offset : offsetValue, limit: limit};
                            });
responseStream = requestStream.flatMap(function(pagination) {
                                return webservice.getUsersByKeyword(query, pagination.offset, pagination.limit); // assume this is async response
                              });
responseStream.subscribe(function(result) {
  // use result to render the display
});

PS1: Я не тестировал вышеуказанный код. Я изучаю RFP, поэтому я просто пытаюсь думать реактивно, записывая. Добро пожаловать в любое предложение.

PS2: На меня сильно влияет https://gist.github.com/staltz/868e7e9bc2a7b8c1f754 для объяснения реактивных потоков.