Таймаут со значением по умолчанию в Java 8 CompletedFuture
Предположим, что у меня есть асинхронное вычисление, например:
CompletableFuture
.supplyAsync(() -> createFoo())
.thenAccept(foo -> doStuffWithFoo(foo));
Есть ли хороший способ предоставить значение по умолчанию для foo, если поставщик async истекает в соответствии с определенным таймаутом? В идеале такая функциональность также попытается отменить медленного поставщика. Например, существует ли стандартная библиотечная функциональность, похожая на следующий гипотетический код:
CompletableFuture
.supplyAsync(() -> createFoo())
.acceptEither(
CompletableFuture.completedAfter(50, TimeUnit.MILLISECONDS, DEFAULT_FOO),
foo -> doStuffWithFoo(foo));
Или, может быть, даже лучше:
CompletableFuture
.supplyAsync(() -> createFoo())
.withDefault(DEFAULT_FOO, 50, TimeUnit.MILLISECONDS)
.thenAccept(foo -> doStuffWithFoo(foo));
Я знаю о get(timeout, unit)
, но мне интересно, есть ли более стандартный стандартный способ применения тайм-аута в асинхронном и реактивном режиме, как это предлагается в коде выше.
EDIT: здесь решение, которое вдохновлено Java 8: Обязательно проверяет обработку исключений в лямбда-выражениях. Почему обязательный, а не факультативный?, но, к сожалению, он блокирует поток. Если мы полагаемся на createFoo(), чтобы асинхронно проверять тайм-аут и бросать свой собственный тайм-аут, он будет работать без блокировки потока, но возлагает больше бремя на создателя поставщика и будет по-прежнему иметь затраты на создание исключения (которое может быть дорогим без "быстрого броска" )
static <T> Supplier<T> wrapped(Callable<T> callable) {
return () -> {
try {
return callable.call();
} catch (RuntimeException e1) {
throw e1;
} catch (Throwable e2) {
throw new RuntimeException(e2);
}
};
}
CompletableFuture
.supplyAsync(wrapped(() -> CompletableFuture.supplyAsync(() -> createFoo()).get(50, TimeUnit.MILLISECONDS)))
.exceptionally(e -> "default")
.thenAcceptAsync(s -> doStuffWithFoo(foo));
Ответы
Ответ 1
CompletableFuture.supplyAsync - это просто вспомогательный метод, который создает для вас CompletableFuture и отправляет задачу в пул ForkJoin.
Вы можете создать свой собственный источникAsync с такими требованиями, как это:
private static final ScheduledExecutorService schedulerExecutor =
Executors.newScheduledThreadPool(10);
private static final ExecutorService executorService =
Executors.newCachedThreadPool();
public static <T> CompletableFuture<T> supplyAsync(
final Supplier<T> supplier, long timeoutValue, TimeUnit timeUnit,
T defaultValue) {
final CompletableFuture<T> cf = new CompletableFuture<T>();
// as pointed out by Peti, the ForkJoinPool.commonPool() delivers a
// ForkJoinTask implementation of Future, that doesn't interrupt when cancelling
// Using Executors.newCachedThreadPool instead in the example
// submit task
Future<?> future = executorService.submit(() -> {
try {
cf.complete(supplier.get());
} catch (Throwable ex) {
cf.completeExceptionally(ex);
}
});
//schedule watcher
schedulerExecutor.schedule(() -> {
if (!cf.isDone()) {
cf.complete(defaultValue);
future.cancel(true);
}
}, timeoutValue, timeUnit);
return cf;
}
Создание CompletableFuture с помощью этого помощника так же просто, как использование статического метода в CompletableFuture:
CompletableFuture<String> a = supplyAsync(() -> "hi", 1,
TimeUnit.SECONDS, "default");
Чтобы проверить это:
a = supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
// ignore
}
return "hi";
}, 1, TimeUnit.SECONDS, "default");
Ответ 2
В Java 9 будет completeOnTimeout (значение T, длительный тайм-аут, единица TimeUnit), которое делает то, что вы хотите, хотя оно не отменяет медленного поставщика.
Существует также orTimeout (long timeout, TimeUnit unit), который завершается исключительно в случае таймаута.
Ответ 3
У DZone есть хорошая статья, как решить эту проблему: https://dzone.com/articles/asynchronous-timeouts
Я не уверен в авторских правах на код, поэтому я не могу его скопировать. Решение очень похоже на решение от Dane White, но оно использует пул потоков с одним потоком плюс schedule()
, чтобы избежать потери потока только для ожидания таймаута.
Он также выдает TimeoutException
вместо возврата по умолчанию.
Ответ 4
Думаю, вам всегда понадобится дополнительный мониторинг потока, когда его время будет предоставлено значение по умолчанию. Я, вероятно, поеду на маршрут с двумя вызовами supplyAsync, с по умолчанию включенными в API-интерфейс утилиты, связанными с acceptEither. Если вы предпочитаете обертывать своего Поставщика, тогда вы можете использовать API-интерфейс утилиты, который делает для вас "любой" вызов:
public class TimeoutDefault {
public static <T> CompletableFuture<T> with(T t, int ms) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(ms);
} catch (InterruptedException e) { }
return t;
});
}
public static <T> Supplier<T> with(Supplier<T> supplier, T t, int ms) {
return () -> CompletableFuture.supplyAsync(supplier)
.applyToEither(TimeoutDefault.with(t, ms), i -> i).join();
}
}
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(Example::createFoo)
.acceptEither(
TimeoutDefault.with("default", 1000),
Example::doStuffWithFoo);
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(TimeoutDefault.with(Example::createFoo, "default", 1000))
.thenAccept(Example::doStuffWithFoo);
Ответ 5
Нет стандартного библиотечного метода для построения CompletableFuture, поставляемого со значением после таймаута. Тем не менее, очень просто сворачивать свои ресурсы с минимальными ресурсами:
private static final ScheduledExecutorService EXECUTOR
= Executors.newSingleThreadScheduledExecutor();
public static <T> CompletableFuture<T> delayedValue(final T value,
final Duration delay) {
final CompletableFuture<T> result = new CompletableFuture<>();
EXECUTOR.schedule(() -> result.complete(value),
delay.toMillis(), TimeUnit.MILLISECONDS);
return result;
}
Его можно использовать с методами < either
<<22 > :
-
accceptEither
, acceptEitherAsync
-
applyToEither
, applyToEitherAsync
-
runAfterEither
, runAfterEitherAsync
В одном приложении используется кешированное значение, если вызов удаленной службы превышает некоторый порог задержки:
interface RemoteServiceClient {
CompletableFuture<Foo> getFoo();
}
final RemoteServiceClient client = /* ... */;
final Foo cachedFoo = /* ... */;
final Duration timeout = /* ... */;
client.getFoos()
.exceptionally(ignoredException -> cachedFoo)
.acceptEither(delayedValue(cachedFoo, timeout),
foo -> /* do something with foo */)
.join();
Если вызов удаленного клиента завершается исключительно (например, SocketTimeoutException
), мы можем быстро выйти из строя и использовать кешированное значение немедленно.
CompletableFuture.anyOf(CompletableFuture<?>...)
можно комбинировать с этим примитивом delayedValue
, чтобы обернуть a CompletableFuture
с помощью указанной семантики:
@SuppressWarnings("unchecked")
public static <T> CompletableFuture<T> withDefault(final CompletableFuture<T> cf,
final T defaultValue,
final Duration timeout) {
return (CompletableFuture<T>) CompletableFuture.anyOf(
cf.exceptionally(ignoredException -> defaultValue),
delayedValue(defaultValue, timeout));
}
Это значительно упрощает приведенный выше пример вызова удаленной службы:
withDefault(client.getFoos(), cachedFoo, timeout)
.thenAccept(foo -> /* do something with foo */)
.join();
CompletableFuture
более точно обозначаются promises, поскольку они отделяют создание Future
от его завершения. Обязательно используйте выделенные пулы потоков для работы с большим процессором. Чтобы создать CompletableFuture
для дорогостоящего вычисления, вы должны использовать перегрузку CompletableFuture#supplyAsync(Supplier, Executor)
, так как перегрузка #supplyAsync(Supplier)
по умолчанию равна общему ForkJoinPool
. Возвращенный CompletableFuture
не смог отменить свою задачу, так как эта функциональность не отображается интерфейсом Executor
. В более общем случае зависимые CompletableFuture
не отменяют своих родителей, например. cf.thenApply(f).cancel(true)
не отменяет cf
. Я бы рекомендовал придерживаться Future
, возвращенного ExecutorService
, если вам нужна эта функциональность.