Как использовать AsyncRestTemplate для одновременного выполнения нескольких вызовов?
Я не понимаю, как эффективно использовать AsyncRestTemplate
для совершения внешних вызовов службы. Для кода ниже:
class Foo {
public void doStuff() {
Future<ResponseEntity<String>> future1 = asyncRestTemplate.getForEntity(
url1, String.class);
String response1 = future1.get();
Future<ResponseEntity<String>> future2 = asyncRestTemplate.getForEntity(
url2, String.class);
String response2 = future2.get();
Future<ResponseEntity<String>> future3 = asyncRestTemplate.getForEntity(
url3, String.class);
String response3 = future3.get();
}
}
В идеале я хочу выполнить все 3 вызова одновременно и обрабатывать результаты, как только они будут выполнены. Однако каждый внешний вызов службы не выбирается до тех пор, пока get()
не будет вызван, а get()
будет заблокирован. Так разве это не побеждает цель AsyncRestTemplate
? Я мог бы также использовать RestTemplate
.
Итак, я не понимаю, как я могу заставить их работать одновременно?
Ответы
Ответ 1
Просто не вызывайте блокировку get()
перед отправкой всех ваших асинхронных вызовов:
class Foo {
public void doStuff() {
ListenableFuture<ResponseEntity<String>> future1 = asyncRestTemplate
.getForEntity(url1, String.class);
ListenableFuture<ResponseEntity<String>> future2 = asyncRestTemplate
.getForEntity(url2, String.class);
ListenableFuture<ResponseEntity<String>> future3 = asyncRestTemplate
.getForEntity(url3, String.class);
String response1 = future1.get();
String response2 = future2.get();
String response3 = future3.get();
}
}
Вы можете делать как отправка, так и get в циклах, но обратите внимание, что текущий сбор результатов неэффективен, поскольку он застрял в следующем незавершенном будущем.
Вы можете добавить все фьючерсы в коллекцию и протестировать ее, проверяя каждое будущее для неблокирования isDone()
. Когда этот вызов возвращает true, вы можете вызвать get()
.
Таким образом, ваш массовый сбор результатов будет оптимизирован, а не ждет следующего медленного будущего результата в порядке вызова get()
s.
Лучше все же вы можете зарегистрировать обратные вызовы (промежутки времени) в каждом ListenableFuture
, возвращенные AccyncRestTemplate
, и вам не нужно беспокоиться о циклическом анализе потенциальных результатов.
Ответ 2
Если вам не нужно использовать "AsyncRestTemplate", я бы предложил использовать RxJava. RxJava zip оператор - это то, что вы ищете. Проверьте код ниже:
private rx.Observable<String> externalCall(String url, int delayMilliseconds) {
return rx.Observable.create(
subscriber -> {
try {
Thread.sleep(delayMilliseconds); //simulate long operation
subscriber.onNext("response(" + url + ") ");
subscriber.onCompleted();
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
);
}
public void callServices() {
rx.Observable<String> call1 = externalCall("url1", 1000).subscribeOn(Schedulers.newThread());
rx.Observable<String> call2 = externalCall("url2", 4000).subscribeOn(Schedulers.newThread());
rx.Observable<String> call3 = externalCall("url3", 5000).subscribeOn(Schedulers.newThread());
rx.Observable.zip(call1, call2, call3, (resp1, resp2, resp3) -> resp1 + resp2 + resp3)
.subscribeOn(Schedulers.newThread())
.subscribe(response -> System.out.println("done with: " + response));
}
Все запросы к внешним службам будут выполняться в отдельных потоках, когда последний вызов будет завершен, функция преобразования (в примере простая конкатенация строк) будет применена, и результат (конкатенированная строка) будет заменен на "zip" наблюдаемый.
Ответ 3
Что я понимаю по вашему вопросу? У вас есть предопределенный асинхронный метод, и вы пытаетесь сделать это методом асинхронно, используя класс RestTemplate.
Я написал метод, который поможет вам вызывать ваш метод асинхронно.
public void testMyAsynchronousMethod(String... args) throws Exception {
// Start the clock
long start = System.currentTimeMillis();
// Kick of multiple, asynchronous lookups
Future<String> future1 = asyncRestTemplate
.getForEntity(url1, String.class);;
Future<String> future2 = asyncRestTemplate
.getForEntity(url2, String.class);
Future<String> future3 = asyncRestTemplate
.getForEntity(url3, String.class);
// Wait until they are all done
while (!(future1 .isDone() && future2.isDone() && future3.isDone())) {
Thread.sleep(10); //10-millisecond pause between each check
}
// Print results, including elapsed time
System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
System.out.println(future1.get());
System.out.println(future2.get());
System.out.println(future3.get());
}
Ответ 4
Возможно, вы захотите использовать класс CompletableFuture
(javadoc).
-
Преобразуйте свои вызовы в CompletableFuture
. Например.
final CompletableFuture<ResponseEntity<String>> cf = CompletableFuture.supplyAsync(() -> {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
-
Следующий вызов CompletableFuture::allOf
с тремя новыми созданными фьючерсами.
-
Вызвать метод join()
на результат. После того, как окончательное окончательное решение будет разрешено, вы можете получить результаты из каждого отдельного завершенного будущего, которое вы создали на шаге 3.
Ответ 5
Я не думаю, что любой из предыдущих ответов действительно достигнет parallelism. Проблема с ответом @diginoise заключается в том, что он фактически не достигает parallelism. Как только мы назовем get
, мы заблокированы. Учтите, что вызовы очень медленные, поэтому future1
занимает 3 секунды, а future2
2 секунды и future3
3 секунды. При вызове 3 get
один за другим, мы в конечном итоге ожидаем 3 + 2 + 3 = 8 секунд.
@Vikrant Kashyap также отвечает на while (!(future1 .isDone() && future2.isDone() && future3.isDone()))
. Кроме того, цикл while
- довольно уродливый вид кода для 3 фьючерсов, что, если у вас есть больше? Ответ @lkz использует другую технологию, чем вы просили, и даже тогда я не уверен, что zip
собирается выполнить эту работу. Из Observable Javadoc:
zip применяет эту функцию в строгой последовательности, поэтому первый элемент излучаемый новым Observable, будет результатом функции применяется к первому элементу, испускаемому каждым из источников Observables; второй элемент, излучаемый новым Наблюдаемым, будет результатом функция, применяемая ко второму элементу, испускаемому каждым из этих Наблюдаемые; и т.д.
Благодаря широкой популярности Spring, они очень стараются поддерживать обратную совместимость и при этом иногда компрометируют API. AsyncRestTemplate
методы, возвращающие ListenableFuture
, являются одним из таких случаев. Если они привязаны к Java 8+, вместо него можно использовать CompletableFuture
. Зачем? Поскольку мы не будем иметь дело с пулами потоков напрямую, у нас нет хорошего способа узнать, когда все функции ListenableFutures завершены. CompletableFuture
имеет метод allOf
, который создает новый CompletableFuture
, который завершается, когда все заданные CompletableFutures завершаются. Поскольку мы не имеем этого в ListenableFuture
, нам придется импровизировать.
Я не компилировал следующий код, но должно быть ясно, что я пытаюсь сделать. Я использую Java 8, потому что в конце 2016 года.
// Lombok FTW
@RequiredArgsConstructor
public final class CounterCallback implements ListenableFutureCallback<ResponseEntity<String>> {
private final LongAdder adder;
public void onFailure(Throwable ex) {
adder.increment();
}
public void onSuccess(ResponseEntity<String> result) {
adder.increment();
}
}
ListenableFuture<ResponseEntity<String>> f1 = asyncRestTemplate
.getForEntity(url1, String.class);
f1.addCallback(//);
// more futures
LongAdder adder = new LongAdder();
ListenableFutureCallback<ResponseEntity<String>> callback = new CounterCallback(adder);
Stream.of(f1, f2, f3)
.forEach {f -> f.addCallback(callback)}
for (int counter = 1; adder.sum() < 3 && counter < 10; counter++) {
Thread.sleep(1000);
}
// either all futures are done or we're done waiting
Map<Boolean, ResponseEntity<String>> futures = Stream.of(f1, f2, f3)
.collect(Collectors.partitioningBy(Future::isDone));
Теперь у нас есть Map
, для которого futures.get(Boolean.TRUE)
предоставит нам все завершенные фьючерсы, а futures.get(Boolean.FALSE)
предоставит нам те, которые этого не сделали. Мы хотим отменить те, которые не были завершены.
Этот код делает несколько вещей, которые важны при параллельном программировании:
- Он не блокируется.
- Он ограничивает операцию до некоторого максимально допустимого времени.
- Он четко разделяет успешные и неудачные случаи.
Ответ 6
Я думаю, вы здесь недооцениваете кое-что. Когда вы вызываете метод getForEntity, запросы уже запускаются. Когда вызывается метод get() будущего объекта, вы просто ожидаете завершения запроса. Поэтому, чтобы уволить все эти три запроса в одном и том же субсектонике, вам просто нужно сделать:
// Each of the lines below will fire an http request when it executed
Future<ResponseEntity<String>> future1 = asyncRestTemplate.getForEntity(url1, String.class);
Future<ResponseEntity<String>> future2 = asyncRestTemplate.getForEntity(url2, String.class);
Future<ResponseEntity<String>> future3 = asyncRestTemplate.getForEntity(url3, String.class);
После того как все эти коды будут запущены, все запросы уже запущены (скорее всего, в одном и том же подсетоде). Тогда вы можете делать все, что хотите, между тем. Как только вы вызываете какой-либо метод get(), вы ожидаете завершения каждого запроса. Если они уже завершены, он сразу же вернется.
// do whatever you want in the meantime
// get the response of the http call and wait if it not completed
String response1 = future1.get();
String response2 = future2.get();
String response3 = future3.get();