Ответ 1
Хотя я не идеальный, я считаю, что вы можете использовать побочные эффекты RX для достижения желаемого результата (операции doOn).
Observable<CredentialsWithTimestamp> credentialsProvider = Observable.just(new CredentialsWithTimestamp("credentials", 1434873025320L)); // replace with your implementation
Observable<ServerResponse> o = credentialsProvider.flatMap(credentialsWithTimestamp -> {
// side effect variable
AtomicLong timestamp = new AtomicLong(credentialsWithTimestamp.timestamp); // computational steering (inc. initial value)
return Observable.just(credentialsWithTimestamp.credentials) // same credentials are reused for each request - if invalid / onError, the later retry() will be called for new credentials
.flatMap(credentials -> api.query("request", credentials, timestamp.get())) // this will use the value from previous doOnNext
.doOnNext(serverResponse -> timestamp.set(serverResponse.getTimestamp()))
.repeat();
})
.retry()
.share();
private static class CredentialsWithTimestamp {
public final String credentials;
public final long timestamp; // I assume this is necessary for you from the first request
public CredentialsWithTimestamp(String credentials, long timestamp) {
this.credentials = credentials;
this.timestamp = timestamp;
}
}
При подписке на "o" внутренний наблюдаемый будет повторяться. Если возникла ошибка, тогда "o" повторит запрос и повторно запросит его из потока учетных данных.
В вашем примере управление вычислением достигается путем обновления переменной timestamp, необходимой для следующего запроса.