Rx Периодически наблюдаемые испускающие значения
Мне нужно периодически опробовать некоторую конечную точку RESTful, чтобы обновить данные приложения для Android. Я также должен приостановить и возобновить его на основе подключения (если телефон отключен, нет необходимости даже пытаться). Мое текущее решение работает, но оно использует стандартный Java ScheduledExecutorService
для выполнения периодических задач, но я бы хотел остаться в парадигме Rx.
Здесь мой текущий код, части которого пропущены для краткости.
userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() {
@Override
public void call(final Subscriber<? super UserProfile> subscriber) {
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final Runnable runnable = new Runnable() {
@Override
public void run() {
// making http request here
}
};
final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1);
networkStatusObservable.subscribe(new Action1<Boolean>() {
@Override
public void call(Boolean networkAvailable) {
if (!networkAvailable) {
pause();
} else {
pause();
futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS));
}
}
private void pause() {
for (ScheduledFuture<?> future : futures) {
future.cancel(true);
}
futures.clear();
}
});
final Subscription subscription = new Subscription() {
private boolean isUnsubscribed = false;
@Override
public void unsubscribe() {
scheduledExecutorService.shutdownNow();
isUnsubscribed = true;
}
@Override
public boolean isUnsubscribed() {
return isUnsubscribed;
}
};
subscriber.add(subscription);
}
}).multicast(BehaviorSubject.create()).refCount();
networkStatusObservable
- это, в основном, широковещательный приемник, завернутый в Observable<Boolean>
, что указывает на то, что телефон подключен к сети.
Как я уже сказал, это решение работает, но я хочу использовать Rx-подход для периодического опроса и испускания новых UserProfile
s, потому что есть множество проблем с планированием вещей вручную, чего я хочу избежать. Я знаю о Observable.timer
и Observable.interval
, но не могу понять, как применить их к этой задаче (и я не уверен, что мне нужно вообще их использовать).
Ответы
Ответ 1
Есть несколько подходов к этой проблеме GitHub, которые могут оказаться полезными.
https://github.com/ReactiveX/RxJava/issues/448
Три реализации:
Observable.interval
Observable.interval(delay, TimeUnit.SECONDS).timeInterval()
.flatMap(new Func1<Long, Observable<Notification<AppState>>>() {
public Observable<Notification<AppState>> call(Long seconds) {
return lyftApi.updateAppState(params).materialize(); } });
Планировщик.Специально
Observable.create({ observer ->
Schedulers.newThread().schedulePeriodically({
observer.onNext("application-state-from-network");
}, 0, 1000, TimeUnit.MILLISECONDS);
}).take(10).subscribe({ v -> println(v) });
Ручная рекурсия
Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(final Observer<? super String> o) {
return Schedulers.newThread().schedule(0L, new Func2<Scheduler, Long, Subscription>() {
@Override
public Subscription call(Scheduler inner, Long t2) {
o.onNext("data-from-polling");
return inner.schedule(t2, this, 1000, TimeUnit.MILLISECONDS);
}
});
}
}).toBlockingObservable().forEach(new Action1<String>() {
@Override
public void call(String v) {
System.out.println("output: " + v);
}
});
И вывод состоит в том, что ручная рекурсия - это путь, потому что он ждет, пока операция не будет завершена, прежде чем планировать следующее выполнение.
Ответ 2
Одним из вариантов является использование Observable.interval и проверка состояния пользователя при испускании интервалов:
Observable<Long> interval = Observable.interval(1, TimeUnit.SECONDS);
//pulling the user data
Observable<Observable<String>> userObservable = interval.map(new Func1<Long, Observable<String>>() {
Random random = new Random();
@Override
public Observable<String> call(Long tick) {
//here you are pulling user data; you should do it asynchronously - rx way - because the interval is using Schedulers.computation which is not best suited for doing io operations
switch(random.nextInt(10)){
case 0://suppose this is for cases when network in not available or exception happens
return Observable.<String>just(null);
case 1:
case 2:
return Observable.just("Alice");
default:
return Observable.just("Bob");
}
}
});
Observable<String> flatUsers = userObservable.flatMap(new Func1<Observable<String>, Observable<? extends String>>() {
@Override
public Observable<? extends String> call(Observable<String> stringObservable) {
return stringObservable;
}
});
//filter valid data
Observable<String> usersWithoutErrors = flatUsers.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s != null;
}
});
//publish only changes
Observable<String> uniqueUsers = usersWithoutErrors.distinctUntilChanged();
Вы можете сделать это еще проще, если ваш networkStatusObservable испускает события, по крайней мере, так часто, как вам нужно проверить данные пользователя.
networkStatusObservable.sample(1,TimeUnit.Seconds).filter(/*the best is to filter only connected state */).map(/*now start pulling the user data*/)
Наконец, вы можете создать наблюдаемый, который использует планировщик для периодического извлечения пользовательских состояний - обратитесь к Документация планировщиков, чтобы узнать, какой планировщик вам подходит. лучше всего:
public abstract class ScheduledOnSubscribe<T> implements Observable.OnSubscribe<T>{
private final Scheduler scheduler;
private final long initialDelay;
private final long period;
private final TimeUnit unit;
public ScheduledOnSubscribe(Scheduler scheduler, long initialDelay, long period, TimeUnit unit) {
this.scheduler = scheduler;
this.initialDelay = initialDelay;
this.period = period;
this.unit = unit;
}
abstract T next() throws Exception;
@Override
public void call(final Subscriber<? super T> subscriber) {
final Scheduler.Worker worker = scheduler.createWorker();
subscriber.add(worker);
worker.schedulePeriodically(new Action0() {
@Override
public void call() {
try {
subscriber.onNext(next());
} catch (Throwable e) {
try {
subscriber.onError(e);
} finally {
worker.unsubscribe();
}
}
}
}, initialDelay, period, unit);
}
}
//And here is the sample usage
Observable<String> usersObservable = Observable.create(new ScheduledOnSubscribe(Schedulers.io(), 1, 1, TimeUnit.SECONDS ){
Random random = new Random();
@Override
String next() throws Exception {
//if you use Schedulers.io, you can call the remote service synchronously
switch(random.nextInt(10)){
case 0:
return null;
case 1:
case 2:
return "Alice";
default:
return "Bob";
}
}
});
Ответ 3
Хорошо, я опубликую собственное решение, возможно, кто-то выиграет от этого. Я только опубликую часть, связанную с вопросом, опуская HTTP и кеширование. Вот как я это делаю:
private ConnectableObservable<Long> createNetworkBoundHeartbeatObservable(final Observable<Boolean> networkStatusObservable,
final Observable<Boolean> pauseResumeObservable) {
final Observable<Boolean> pausableHeartbeatObservable = Observable.combineLatest(networkStatusObservable, pauseResumeObservable,
new Func2<Boolean, Boolean, Boolean>() {
@Override
public Boolean call(Boolean networkAvailable, Boolean mustPause) {
return mustPause && networkAvailable;
}
}
).distinctUntilChanged();
final Observable<Boolean> hasToResumeObservable = pausableHeartbeatObservable.filter(new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean networkAvailable) {
return networkAvailable;
}
});
final Observable<Boolean> hasToStopObservable = pausableHeartbeatObservable.filter(new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean networkAvailable) {
return !networkAvailable;
}
});
return pausableHeartbeatObservable.concatMap(new Func1<Boolean, Observable<Long>>() {
@Override
public Observable<Long> call(Boolean shouldResumeRequests) {
if (shouldResumeRequests) {
long timeToUpdate;
final Date oldestModifiedExpiresAt = cache.oldestModifiedExpiresAt();
timeToUpdate = Math.max(0, oldestModifiedExpiresAt.getTime() - System.currentTimeMillis());
Log.d(TAG, String.format("Have to restart updates, %d seconds till next update", timeToUpdate / SECOND_IN_MILLIS));
return Observable
.timer(timeToUpdate, SECONDS_TO_EXPIRE * SECOND_IN_MILLIS, TimeUnit.MILLISECONDS)
.takeUntil(hasToStopObservable);
} else {
Log.d(TAG, "Have to pause updates");
return Observable.<Long>never().takeUntil(hasToResumeObservable);
}
}
}).multicast(PublishSubject.<Long>create());
}
Как вы можете видеть, условия для приостановки или возобновления обновлений становятся несколько более сложными, а новый Observable добавлен для поддержки приостановки, когда приложение переходит в фоновый режим.
Тогда в основе решения лежит операция concatMap
, которая последовательно выделяет Observables
(следовательно, concatMap, а не flatMap, см. этот вопрос: В чем разница между concatMap и flatMap в RxJava). Он испускает либо interval
, либо never
Observables
, в зависимости от того, следует ли продолжать или приостанавливать обновление. Тогда каждое излучаемое Observable
есть takenUntil
'противоположное' Observable
испускает новое значение.
ConnectableObservable
возвращается, потому что созданный Observable
является горячим, и все предполагаемые подписчики должны подписываться на него, прежде чем он начнет что-то испускать, иначе исходные события могут быть потеряны. Я назову connect
на нем позже.
Я буду принимать либо мой, либо другой ответ, основанный на голосовании, если таковой имеется.
Ответ 4
Существует более простой способ сделать это, используя interval(). Я тестировал этот код, и он работает.
Но сначала вы должны инкапсулировать задание, которое вы хотите периодически выполнять в подклассе Action1.
class Act<T> implements Action1<T> {
public Service service;
public String data;
public void call(T t){
service.log(data); //the periodic job
}
}
(Я сохранил поля для краткости, но это не рекомендуется). Теперь вы можете запланировать его следующим образом:
Act<Long> act=new Act<>();
act.data="dummy data";
act.service=this;
Observable.interval(0l, period, TimeUnit.SECONDS).subscribeOn(Schedulers.from(Executors.newFixedThreadPool(10))).subscribe((Action1<Long>)act);
Это не будет блокировать ваши потоки нигде, в отличие от подхода, указанного в другом ответе. Этот подход позволяет нам передавать переменную как своего рода изменяемое хранилище внутри Action, которое может быть удобно в последующих вызовах. Кроме того, таким образом вы можете подписаться на свой звонок в своем пуле потоков.