Задачи в очереди с помощью RxJava на Android
Я разрабатываю приложение для Android с синхронизацией фоновых данных.
В настоящее время я использую RxJava для размещения некоторых данных на сервере через регулярные промежутки времени.
Кроме этого, я хотел бы предоставить пользователю кнопку "принудительная синхронизация", которая немедленно вызовет синхронизацию.
Я знаю, как использовать Observable.interval()
для перемещения данных через регулярные промежутки времени, и я бы знал, как использовать Observalbe.just()
для форматирования того, что принудительно, но я хотел бы поставить их в очередь, если так случилось, что один из них запускается во время предыдущее все еще выполняется.
Итак, давайте возьмем пример, когда 1min - это интервал автоматической синхронизации, и пусть говорят, что синхронизация длится 40 секунд (я больше преувеличиваю здесь, чтобы сделать более легкую точку). Теперь, если случайно пользователь нажимает кнопку "force", когда автомат все еще работает (или наоборот - автоматические триггеры, когда принудительный один все еще запущен), я хотел бы поставить в очередь второй запрос синхронизации, чтобы идти так же, как и первый заканчивается.
Я нарисовал это изображение, которое может показаться ему более перспективным:
![введите описание изображения здесь]()
Как вы можете видеть, автоматический запуск (с помощью некоторых Observable.interval()
), а в середине синхронизации пользователь нажимает кнопку "force". Теперь мы хотим дождаться завершения первого запроса и затем снова начать принудительный запрос.
В какой-то момент, когда принудительный запрос запущен, новый автоматический запрос был снова запущен, что только добавило его в очередь. После того, как последний был завершен из очереди, все останавливается, а затем автоматический запуск запланирован немного позже.
Надеюсь, что кто-нибудь может указать мне, чтобы исправить оператора, как это сделать. Я пробовал с Observable.combineLatest()
, но список очередей был отправлен с самого начала, и когда я добавил новую синхронизацию в очередь, она не продолжилась, когда предыдущая операция была завершена.
Любая помощь приветствуется,
Дарко
Ответы
Ответ 1
Вы можете сделать это, объединив таймер с помощью кнопки Observable
/Subject
, используя эффект очереди onBackpressureBuffer
и concatMap обработки в нее, которая гарантирует, что она запускается по одному.
PublishSubject<Long> subject = PublishSubject.create();
Observable<Long> periodic = Observable.interval(1, 1, TimeUnit.SECONDS);
periodic.mergeWith(subject)
.onBackpressureBuffer()
.concatMap(new Func1<Long, Observable<Integer>>() {
@Override
public Observable<Integer> call(Long v) {
// simulates the task to run
return Observable.just(1)
.delay(300, TimeUnit.MILLISECONDS);
}
}
).subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(1100);
// user clicks a button
subject.onNext(-1L);
Thread.sleep(800);
Ответ 2
Несмотря на то, что есть приемлемый ответ с хорошим решением, я хотел бы поделиться другим вариантом для этого, используя Scheduler и SingleThreadExecutor
public static void main(String[] args) throws Exception {
System.out.println(" init ");
Observable<Long> numberObservable =
Observable.interval(700, TimeUnit.MILLISECONDS).take(10);
final Subject subject = PublishSubject.create();
Executor executor = Executors.newSingleThreadExecutor();
Scheduler scheduler = Schedulers.from(executor);
numberObservable.observeOn(scheduler).subscribe(subject);
subject.subscribe(onNextFunc("subscriber 1"), onErrorFunc("subscriber 1"),
onCompleteFunc("subscriber 1"));
Thread.sleep(800);
//simulate action
executor.execute(new Runnable() {
@Override
public void run() {
subject.onNext(333l);
}
});
Thread.sleep(5000);
}
static Action1<Long> onNextFunc(final String who) {
return new Action1<Long>() {
public void call(Long x) {
System.out.println(who + " got " + x + " :: " + Thread.currentThread().getName()
+ " -- " + System.currentTimeMillis());
try {
//simulate some work
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}
static Action1<Throwable> onErrorFunc(final String who) {
return new Action1<Throwable>() {
public void call(Throwable t) {
t.printStackTrace();
}
};
}
static Action0 onCompleteFunc(final String who) {
return new Action0() {
public void call() {
System.out.println(who + " complete");
}
};
}