Повторить логику с помощью CompletableFuture
Мне нужно отправить задачу в структуру async, над которой я работаю, но мне нужно поймать исключения и повторить одну и ту же задачу несколько раз до "прерывания".
Код, с которым я работаю, это:
int retries = 0;
public CompletableFuture<Result> executeActionAsync() {
// Execute the action async and get the future
CompletableFuture<Result> f = executeMycustomActionHere();
// If the future completes with exception:
f.exceptionally(ex -> {
retries++; // Increment the retry count
if (retries < MAX_RETRIES)
return executeActionAsync(); // <--- Submit one more time
// Abort with a null value
return null;
});
// Return the future
return f;
}
В настоящее время это не компилируется, потому что тип возврата лямбда неверен: он ожидает Result
, но executeActionAsync
возвращает a CompletableFuture<Result>
.
Как я могу реализовать эту полностью асинхронную логику повтора?
Ответы
Ответ 1
Я думаю, что я был успешно. Вот пример класса, который я создал, и тестовый код:
RetriableTask.java
public class RetriableTask
{
protected static final int MAX_RETRIES = 10;
protected int retries = 0;
protected int n = 0;
protected CompletableFuture<Integer> future = new CompletableFuture<Integer>();
public RetriableTask(int number) {
n = number;
}
public CompletableFuture<Integer> executeAsync() {
// Create a failure within variable timeout
Duration timeoutInMilliseconds = Duration.ofMillis(1*(int)Math.pow(2, retries));
CompletableFuture<Integer> timeoutFuture = Utils.failAfter(timeoutInMilliseconds);
// Create a dummy future and complete only if (n > 5 && retries > 5) so we can test for both completion and timeouts.
// In real application this should be a real future
final CompletableFuture<Integer> taskFuture = new CompletableFuture<>();
if (n > 5 && retries > 5)
taskFuture.complete(retries * n);
// Attach the failure future to the task future, and perform a check on completion
taskFuture.applyToEither(timeoutFuture, Function.identity())
.whenCompleteAsync((result, exception) -> {
if (exception == null) {
future.complete(result);
} else {
retries++;
if (retries >= MAX_RETRIES) {
future.completeExceptionally(exception);
} else {
executeAsync();
}
}
});
// Return the future
return future;
}
}
использование
int size = 10;
System.out.println("generating...");
List<RetriableTask> tasks = new ArrayList<>();
for (int i = 0; i < size; i++) {
tasks.add(new RetriableTask(i));
}
System.out.println("issuing...");
List<CompletableFuture<Integer>> futures = new ArrayList<>();
for (int i = 0; i < size; i++) {
futures.add(tasks.get(i).executeAsync());
}
System.out.println("Waiting...");
for (int i = 0; i < size; i++) {
try {
CompletableFuture<Integer> future = futures.get(i);
int result = future.get();
System.out.println(i + " result is " + result);
} catch (Exception ex) {
System.out.println(i + " I got exception!");
}
}
System.out.println("Done waiting...");
Выход
generating...
issuing...
Waiting...
0 I got exception!
1 I got exception!
2 I got exception!
3 I got exception!
4 I got exception!
5 I got exception!
6 result is 36
7 result is 42
8 result is 48
9 result is 54
Done waiting...
Основная идея и некоторый связующий код (функция failAfter
) приходят отсюда.
Любые другие предложения или улучшения приветствуются.
Ответ 2
Последовательность повторных попыток может быть простой:
public CompletableFuture<Result> executeActionAsync() {
CompletableFuture<Result> f=executeMycustomActionHere();
for(int i=0; i<MAX_RETRIES; i++) {
f=f.exceptionally(t -> executeMycustomActionHere().join());
}
return f;
}
Читайте о недостатках ниже
Это просто объединяет столько попыток, сколько нужно, поскольку эти последующие этапы ничего не сделают в неисключительном случае.
Один недостаток заключается в том, что если первая попытка завершается неудачно сразу, так что f
уже завершена исключительно, когда первый exceptionally
обработчик связан в цепочку, вызывающий поток будет вызывать действие, полностью удаляя асинхронную природу запроса. И вообще, join()
может заблокировать поток (исполнитель по умолчанию запустит новый поток компенсации, но все же его не рекомендуется). К сожалению, здесь нет ни метода exceptionallyCompose
асинхронного, ни exceptionallyAsync
exceptionallyCompose
сложного.
Решение, не вызывающее join()
, будет
public CompletableFuture<Result> executeActionAsync() {
CompletableFuture<Result> f=executeMycustomActionHere();
for(int i=0; i<MAX_RETRIES; i++) {
f=f.thenApply(CompletableFuture::completedFuture)
.exceptionally(t -> executeMycustomActionHere())
.thenCompose(Function.identity());
}
return f;
}
демонстрируя, насколько сложным является комбинирование "compose" и "исключительно" обработчика.
Кроме того, будет сообщено только последнее исключение, если все повторные попытки не пройдены. Лучшее решение должно сообщать о первом исключении, с последующими исключениями повторных попыток, добавленными как исключенные исключения. Такое решение может быть построено путем объединения рекурсивного вызова, как намекает ответ Гилиса, однако, чтобы использовать эту идею для обработки исключений, мы должны использовать шаги для объединения "составить" и "исключительно", показанные выше:
public CompletableFuture<Result> executeActionAsync() {
return executeMycustomActionHere()
.thenApply(CompletableFuture::completedFuture)
.exceptionally(t -> retry(t, 0))
.thenCompose(Function.identity());
}
private CompletableFuture<Result> retry(Throwable first, int retry) {
if(retry >= MAX_RETRIES) return CompletableFuture.failedFuture(first);
return executeMycustomActionHere()
.thenApply(CompletableFuture::completedFuture)
.exceptionally(t -> { first.addSuppressed(t); return retry(first, retry+1); })
.thenCompose(Function.identity());
}
CompletableFuture.failedFuture
- это метод Java 9, но было бы тривиально добавить бэкпорт, совместимый с Java 8, в ваш код, если это необходимо:
public static <T> CompletableFuture<T> failedFuture(Throwable t) {
final CompletableFuture<T> cf = new CompletableFuture<>();
cf.completeExceptionally(t);
return cf;
}
Ответ 3
Недавно я решил аналогичную проблему, используя библиотеку guava-retrying.
Callable<Result> callable = new Callable<Result>() {
public Result call() throws Exception {
return executeMycustomActionHere();
}
};
Retryer<Boolean> retryer = RetryerBuilder.<Result>newBuilder()
.retryIfResult(Predicates.<Result>isNull())
.retryIfExceptionOfType(IOException.class)
.retryIfRuntimeException()
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRIES))
.build();
CompletableFuture.supplyAsync( () -> {
try {
retryer.call(callable);
} catch (RetryException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
Ответ 4
Вот подход, который будет работать для любого подкласса CompletionStage
и не будет возвращать фиктивный CompletableFuture
, который не более чем ждет, чтобы получить обновление от других фьючерсов.
/**
* Sends a request that may run as many times as necessary.
*
* @param request a supplier initiates an HTTP request
* @param executor the Executor used to run the request
* @return the server response
*/
public CompletionStage<Response> asyncRequest(Supplier<CompletionStage<Response>> request, Executor executor)
{
return retry(request, executor, 0);
}
/**
* Sends a request that may run as many times as necessary.
*
* @param request a supplier initiates an HTTP request
* @param executor the Executor used to run the request
* @param tries the number of times the operation has been retried
* @return the server response
*/
private CompletionStage<Response> retry(Supplier<CompletionStage<Response>> request, Executor executor, int tries)
{
if (tries >= MAX_RETRIES)
throw new CompletionException(new IOException("Request failed after " + MAX_RETRIES + " tries"));
return request.get().thenComposeAsync(response ->
{
if (response.getStatusInfo().getFamily() != Response.Status.Family.SUCCESSFUL)
return retry(request, executor, tries + 1);
return CompletableFuture.completedFuture(response);
}, executor);
}
Ответ 5
Вместо того, чтобы реализовывать собственную логику повторов, я рекомендую использовать проверенную библиотеку, такую как failsafe, которая имеет встроенную поддержку для futures (и, кажется, более популярна, чем повторные попытки guava). Для вашего примера это будет выглядеть примерно так:
private static RetryPolicy retryPolicy = new RetryPolicy()
.withMaxRetries(MAX_RETRIES);
public CompletableFuture<Result> executeActionAsync() {
return Failsafe.with(retryPolicy)
.with(executor)
.withFallback(null)
.future(this::executeMycustomActionHere);
}
Вероятно, вам следует избегать .withFallback(null)
и просто позволить возвращенному будущему .get()
результирующее исключение, чтобы вызывающая .get()
вашего метода могла обработать его специально, но это дизайнерское решение вам придется принять.
Другие вещи, о которых следует подумать, включают: следует ли вам повторить попытку немедленно или подождать некоторое время между попытками, какой-либо рекурсивный откат (полезно при вызове веб-службы, которая может быть недоступна), и есть ли конкретные исключения, которые не ' Стоит повторить попытку (например, если параметры метода неверны).
Ответ 6
класс утилит:
public class RetryUtil {
public static <R> CompletableFuture<R> retry(Supplier<CompletableFuture<R>> supplier, int maxRetries) {
CompletableFuture<R> f = supplier.get();
for(int i=0; i<maxRetries; i++) {
f=f.thenApply(CompletableFuture::completedFuture)
.exceptionally(t -> {
System.out.println("retry for: "+t.getMessage());
return supplier.get();
})
.thenCompose(Function.identity());
}
return f;
}
}
использование:
public CompletableFuture<String> lucky(){
return CompletableFuture.supplyAsync(()->{
double luckNum = Math.random();
double luckEnough = 0.6;
if(luckNum < luckEnough){
throw new RuntimeException("not luck enough: " + luckNum);
}
return "I'm lucky: "+luckNum;
});
}
@Test
public void testRetry(){
CompletableFuture<String> retry = RetryUtil.retry(this::lucky, 10);
System.out.println("async check");
String join = retry.join();
System.out.println("lucky? "+join);
}
выход
async check
retry for: java.lang.RuntimeException: not luck enough: 0.412296354211683
retry for: java.lang.RuntimeException: not luck enough: 0.4099777199676573
lucky? I'm lucky: 0.8059089479049389
Ответ 7
Что говорит о гораздо более прямолинейном решении с простым потоком, который просто перезаписывает CompletableFuture, если оно завершено неудачно? Вы также можете использовать здесь параллельный поток для параллельного выполнения вашей функции.
public CompletableFuture<Integer> executeActionAsync() {
return List.of(1, 2, 3, 4, 5).stream().map(this::executeMycustomActionHere)
.filter(f -> !f.isCompletedExceptionally())
.findFirst().orElse(CompletableFuture.failedFuture(new IllegalStateException("Not successful")));
}
public CompletableFuture<Integer> executeMycustomActionHere(Integer n) {
System.out.println("Checking now: " + n);
if(n < 3) return CompletableFuture.failedFuture(new IllegalStateException("Fail for " + n));
return CompletableFuture.completedFuture(n);
}