RXJava - сделать паузу видимым (например, с буфером и окном)

Я хочу создать наблюдаемые, которые выполняют следующие действия:

  • буферизировать все элементы, в то время как они приостановлены
  • немедленно издает элементы, в то время как они не приостановлены
  • триггер паузы/возобновления должен поступать от другого наблюдаемого
  • он должен быть сохранен для использования наблюдаемыми, которые не запускаются в основном потоке, и он должен сохранять изменение приостановленного/возобновленного состояния из основного потока

Я хочу использовать BehaviorSubject<Boolean> как триггер и связать этот триггер с событием onResume и onPause. (Пример кода прилагается)

Вопрос

Я что-то наладил, но он не работает должным образом. Я использую его следующим образом:

Observable o = ...;
// Variant 1
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue())
// Variant 2
// o = o.compose(RXPauser.applyPauser(getPauser()));
o
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();

В настоящее время проблема заключается в том, что вариант 1 должен работать нормально, но иногда события просто не испускаются - клапан не испускает, пока клапан не работает (может быть проблема с резьбой...)! Решение 2 намного проще и, похоже, работает, но я не уверен, что это действительно лучше, я так не думаю. Я на самом деле не уверен, почему решение иногда терпит неудачу, поэтому я не уверен, что решение 2 решает проблему (в настоящее время для меня неизвестно)...

Может кто-нибудь сказать мне, что может быть проблемой или если простое решение должно работать надежно? Или показать мне надежное решение?

код

RxValue

https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08

Функции RXPauser

public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser)
{
    return observable -> pauser(observable, pauser);
}

private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser)
{
    // this observable buffers all items that are emitted while emission is paused
    Observable<T> sharedSource = source.publish().refCount();
    Observable<T> queue = sharedSource
            .buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed))
            .flatMap(l -> Observable.from(l))
            .doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t));

    // this observable emits all items that are emitted while emission is not paused
    Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean ->  pauser.distinctUntilChanged().filter(isResumed -> !isResumed))
            .switchMap(tObservable -> tObservable)
            .doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t));

    // combine both observables
    return queue.mergeWith(window)
            .doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t));
}

Деятельность

public class BaseActivity extends AppCompatActivity {

    private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false);

    public BaseActivity(Bundle savedInstanceState)
    {
        super(args);
        final Class<?> clazz = this.getClass();
        pauser
                .doOnUnsubscribe(() -> {
                    L.d(clazz, "Pauser unsubscribed!");
                })
                .subscribe(aBoolean -> {
                    L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED"));
                });
    }

    public PublishSubject<Boolean> getPauser()
    {
        return pauser;
    }

    @Override
    protected void onResume()
    {
        super.onResume();
        pauser.onNext(true);
    }

    @Override
    protected void onPause()
    {
        pauser.onNext(false);
        super.onPause();
    }
}

Ответы

Ответ 1

Фактически вы можете использовать оператор .buffer(), передающий его наблюдаемым, определяя, когда останавливать буферизацию, образец из книги:

Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
    .buffer(Observable.interval(250, TimeUnit.MILLISECONDS))
    .subscribe(System.out::println);

из главы 5, "Укрощение последовательности": https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md

Вы можете использовать PublishSubject как Observable для подачи его элементов в свой пользовательский оператор. Каждый раз, когда вам нужно начинать буферизацию, создайте экземпляр Observable.defer(() -> createBufferingValve())

Ответ 2

Я сделал аналогичную вещь для регистрации событий.
Тема собирает некоторые события, и один раз в 10 секунд отталкивает их на сервер.

Основная идея: например, у вас есть класс Event.

public class Event {

    public String jsonData;

    public String getJsonData() {
        return jsonData;
    }

    public Event setJsonData(String jsonData) {
        this.jsonData = jsonData;
        return this;
    }
}

Вы должны создать очередь для событий:

private PublishSubject<Event> eventQueue = PublishSubject.create();

Это может быть BehaviorSubject, это не имеет значения

Затем вы должны создать логику, которая будет обрабатывать нажатие событий на сервер:

    eventObservable = eventQueue.asObservable()
            .buffer(10, TimeUnit.SECONDS)   //flush events every 10 seconds
            .toList()
            .doOnNext(new Action1<List<Event>>() {
                @Override
                public void call(List<Event> events) {
                    apiClient.pushEvents(events);     //push your event
                }
            })
            .onErrorResumeNext(new Func1<Throwable, Observable<List<Event>>>() {
                @Override
                public Observable<List<Event>> call(Throwable throwable) {
                    return null;    //make sure, that on error will be never called
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io());

Затем вы должны подписаться на него и сохранить подписку, пока она вам не понадобится:

eventSubscription = eventObservable.subscribe()

Домой это помогает