Выполнение параллельного метода из метода вызова
У меня есть библиотека, которая используется клиентом, и они передают объект DataRequest
, который имеет userid
, timeout
и некоторые другие поля в нем. Теперь я использую этот объект DataRequest
для создания URL-адреса, а затем я делаю HTTP-вызов с помощью RestTemplate
, и моя служба возвращает ответ JSON, который я использую для создания объекта DataResponse
и возвращает этот объект DataResponse
назад к ним.
Ниже мой класс DataClient
, используемый клиентом, передавая ему объект DataRequest
. Я использую значение тайм-аута, переданное клиентом в DataRequest
, чтобы пропустить запрос, если он занимает слишком много времени в методе getSyncData
.
public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
// first executor
private ExecutorService service = Executors.newFixedThreadPool(15);
@Override
public DataResponse getSyncData(DataRequest key) {
DataResponse response = null;
Future<DataResponse> responseFuture = null;
try {
responseFuture = getAsyncData(key);
response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
} catch (TimeoutException ex) {
response = new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
responseFuture.cancel(true);
// logging exception here
}
return response;
}
@Override
public Future<DataResponse> getAsyncData(DataRequest key) {
DataFetcherTask task = new DataFetcherTask(key, restTemplate);
Future<DataResponse> future = service.submit(task);
return future;
}
}
DataFetcherTask
класс:
public class DataFetcherTask implements Callable<DataResponse> {
private DataRequest key;
private RestTemplate restTemplate;
public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public DataResponse call() throws Exception {
// In a nutshell below is what I am doing here.
// 1. Make an url using DataRequest key.
// 2. And then execute the url RestTemplate.
// 3. Make a DataResponse object and return it.
// I am calling this whole logic in call method as LogicA
}
}
В настоящее время мой класс DataFetcherTask
отвечает за один ключ DataRequest
, как показано выше.
Проблема: -
Теперь у меня небольшое изменение дизайна. Клиент передаст объект DataRequest
(например, keyA) в мою библиотеку, а затем я сделаю новый HTTP-вызов другой службе (которую я не делаю в своем текущем проекте), используя идентификатор пользователя, присутствующий в DataRequest
(keyA) объект, который вернет мне список идентификаторов пользователя, поэтому я буду использовать этот идентификатор пользователя и сделать несколько других DataRequest
(keyB, keyC, keyD) объектов по одному для каждого идентификатора пользователя, возвращаемого в ответ. И тогда у меня будет объект List<DataRequest>
, который будет иметь объекты keyB, keyC и keyD DataRequest
. Максимальным элементом в List<DataRequest>
будет три, все.
Теперь для каждого из объектов DataRequest
в List<DataRequest>
я хочу выполнить выше DataFetcherTask.call
метод параллельно, а затем сделать List<DataResponse>
, добавив каждый DataResponse
для каждого ключа. Поэтому у меня будет три параллельных вызова DataFetcherTask.call
. Идея этого параллельного вызова состоит в том, чтобы получить данные для всех этих максимальных трех ключей в одном глобальном значении таймаута.
Итак, мое предложение - DataFetcherTask
класс вернет обратно List<DataResponse>
объект вместо DataResponse
, а затем изменит подпись метода getSyncData
и getAsyncData
. Итак, вот алгоритм:
- Использовать объект DataRequest, переданный клиентом, сделать
List<DataRequest>
, вызвав другую службу HTTP.
- Сделайте параллельный вызов для каждого метода
DataRequest
в List<DataRequest>
до DataFetcherTask.call
и верните объект List<DataResponse>
клиенту вместо DataResponse
.
Таким образом, я могу применить один и тот же глобальный тайм-аут на шаге 1 вместе с шагом 2. Если какой-либо из вышеперечисленных шагов занимает время, мы просто перейдем к методу getSyncData
.
DataFetcherTask
после изменения дизайна:
public class DataFetcherTask implements Callable<List<DataResponse>> {
private DataRequest key;
private RestTemplate restTemplate;
// second executor here
private ExecutorService executorService = Executors.newFixedThreadPool(10);
public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public List<DataResponse> call() throws Exception {
List<DataRequest> keys = generateKeys();
CompletionService<DataResponse> comp = new ExecutorCompletionService<>(executorService);
int count = 0;
for (final DataRequest key : keys) {
comp.submit(new Callable<DataResponse>() {
@Override
public DataResponse call() throws Exception {
return performDataRequest(key);
}
});
}
List<DataResponse> responseList = new ArrayList<DataResponse>();
while (count-- > 0) {
Future<DataResponse> future = comp.take();
responseList.add(future.get());
}
return responseList;
}
// In this method I am making a HTTP call to another service
// and then I will make List<DataRequest> accordingly.
private List<DataRequest> generateKeys() {
List<DataRequest> keys = new ArrayList<>();
// use key object which is passed in contructor to make HTTP call to another service
// and then make List of DataRequest object and return keys.
return keys;
}
private DataResponse performDataRequest(DataRequest key) {
// This will have all LogicA code here which is shown in my original design.
// everything as it is same..
}
}
Теперь мой вопрос -
- Должно ли это быть таким? Каков правильный дизайн для решения этой проблемы? Я имею в виду, что метод
call
в другом методе call
выглядит странным?
- Нужно ли иметь двух исполнителей, подобных мне в моем коде? Есть ли лучший способ решить эту проблему или любое изменение упрощения/дизайна, которое мы можем здесь сделать?
Я упростил код, чтобы идея поняла, что я пытаюсь сделать.
Ответы
Ответ 1
Как уже упоминалось в комментариях к вашему вопросу, вы можете использовать фреймворк Java ForkJoin. Это избавит вас от лишнего пула потоков в вашем DataFetcherTask
.
Вам просто нужно использовать ForkJoinPool
в DataClient
и преобразовать DataFetcherTask
в RecursiveTask
(один из подтипов ForkJoinTask
). Это позволяет вам легко выполнять другие подзадачи параллельно.
Итак, после этих изменений ваш код будет выглядеть примерно так:
DataFetcherTask
DataFetcherTask
теперь RecursiveTask
, который сначала генерирует ключи и вызывает подзадачи для каждого сгенерированного ключа. Эти подзадачи выполняются в том же ForkJoinPool
как родительская задача.
public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {
private final DataRequest key;
private final RestTemplate restTemplate;
public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
protected List<DataResponse> compute() {
// Create subtasks for the key and invoke them
List<DataRequestTask> requestTasks = requestTasks(generateKeys());
invokeAll(requestTasks);
// All tasks are finished if invokeAll() returns.
List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
for (DataRequestTask task : requestTasks) {
try {
responseList.add(task.get());
} catch (InterruptedException | ExecutionException e) {
// TODO - Handle exception properly
Thread.currentThread().interrupt();
return Collections.emptyList();
}
}
return responseList;
}
private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
List<DataRequestTask> tasks = new ArrayList<>(keys.size());
for (DataRequest key : keys) {
tasks.add(new DataRequestTask(key));
}
return tasks;
}
// In this method I am making a HTTP call to another service
// and then I will make List<DataRequest> accordingly.
private List<DataRequest> generateKeys() {
List<DataRequest> keys = new ArrayList<>();
// use key object which is passed in contructor to make HTTP call to another service
// and then make List of DataRequest object and return keys.
return keys;
}
/** Inner class for the subtasks. */
private static class DataRequestTask extends RecursiveTask<DataResponse> {
private final DataRequest request;
public DataRequestTask(DataRequest request) {
this.request = request;
}
@Override
protected DataResponse compute() {
return performDataRequest(this.request);
}
private DataResponse performDataRequest(DataRequest key) {
// This will have all LogicA code here which is shown in my original design.
// everything as it is same..
return new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK);
}
}
}
DataClient
DataClient
не будет сильно изменяться, кроме нового пула потоков:
public class DataClient implements Client {
private final RestTemplate restTemplate = new RestTemplate();
// Replace the ExecutorService with a ForkJoinPool
private final ForkJoinPool service = new ForkJoinPool(15);
@Override
public List<DataResponse> getSyncData(DataRequest key) {
List<DataResponse> responsList = null;
Future<List<DataResponse>> responseFuture = null;
try {
responseFuture = getAsyncData(key);
responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
} catch (TimeoutException | ExecutionException | InterruptedException ex) {
responsList = Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR));
responseFuture.cancel(true);
// logging exception here
}
return responsList;
}
@Override
public Future<List<DataResponse>> getAsyncData(DataRequest key) {
DataFetcherTask task = new DataFetcherTask(key, this.restTemplate);
return this.service.submit(task);
}
}
Как только вы на Java8, вы можете рассмотреть возможность изменения реализации на CompletableFuture
s. Тогда он будет выглядеть примерно так:
DataClientCF
public class DataClientCF {
private final RestTemplate restTemplate = new RestTemplate();
private final ExecutorService executor = Executors.newFixedThreadPool(15);
public List<DataResponse> getData(DataRequest initialKey) {
return CompletableFuture.supplyAsync(() -> generateKeys(initialKey), this.executor)
.thenApply(requests -> requests.stream().map(this::supplyRequestAsync).collect(Collectors.toList()))
.thenApply(responseFutures -> responseFutures.stream().map(future -> future.join()).collect(Collectors.toList()))
.exceptionally(t -> { throw new RuntimeException(t); })
.join();
}
private List<DataRequest> generateKeys(DataRequest key) {
return new ArrayList<>();
}
private CompletableFuture<DataResponse> supplyRequestAsync(DataRequest key) {
return CompletableFuture.supplyAsync(() -> new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK), this.executor);
}
}
Как упоминалось в комментариях, Guava ListenableFuture
обеспечит аналогичную функциональность для Java7, но без Lambdas они, как правило, становятся неуклюжими.
Ответ 2
Как я знаю, RestTemplate блокируется, он указан в ForkJoinPool JavaDoc в ForkJoinTask:
Вычисления должны избегать синхронизированных методов или блоков и должны свести к минимуму другую блокирующую синхронизацию, помимо объединения других задач или использования синхронизаторов, таких как Phasers, которые рекламируются для сотрудничества с планированием fork/join....
Задачи также не должны выполнять блокировку IO,...
Входящий вызов является избыточным.
И вам не нужны два исполнителя. Также вы можете вернуть частичный результат в getSyncData(DataRequest key)
. Это можно сделать так
DataClient.java
public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
// first executor
private ExecutorService service = Executors.newFixedThreadPool(15);
@Override
public List<DataResponse> getSyncData(DataRequest key) {
List<DataResponse> responseList = null;
DataFetcherResult response = null;
try {
response = getAsyncData(key);
responseList = response.get(key.getTimeout(), key.getTimeoutUnit());
} catch (TimeoutException ex) {
response.cancel(true);
responseList = response.getPartialResult();
}
return responseList;
}
@Override
public DataFetcherResult getAsyncData(DataRequest key) {
List<DataRequest> keys = generateKeys(key);
final List<Future<DataResponse>> responseList = new ArrayList<>();
final CountDownLatch latch = new CountDownLatch(keys.size());//assume keys is not null
for (final DataRequest _key : keys) {
responseList.add(service.submit(new Callable<DataResponse>() {
@Override
public DataResponse call() throws Exception {
DataResponse response = null;
try {
response = performDataRequest(_key);
} finally {
latch.countDown();
return response;
}
}
}));
}
return new DataFetcherResult(responseList, latch);
}
// In this method I am making a HTTP call to another service
// and then I will make List<DataRequest> accordingly.
private List<DataRequest> generateKeys(DataRequest key) {
List<DataRequest> keys = new ArrayList<>();
// use key object which is passed in contructor to make HTTP call to another service
// and then make List of DataRequest object and return keys.
return keys;
}
private DataResponse performDataRequest(DataRequest key) {
// This will have all LogicA code here which is shown in my original design.
// everything as it is same..
return null;
}
}
DataFetcherResult.java
public class DataFetcherResult implements Future<List<DataResponse>> {
final List<Future<DataResponse>> futures;
final CountDownLatch latch;
public DataFetcherResult(List<Future<DataResponse>> futures, CountDownLatch latch) {
this.futures = futures;
this.latch = latch;
}
//non-blocking
public List<DataResponse> getPartialResult() {
List<DataResponse> result = new ArrayList<>(futures.size());
for (Future<DataResponse> future : futures) {
try {
result.add(future.isDone() ? future.get() : null);
//instead of null you can return new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
//ExecutionException or CancellationException could be thrown, especially if DataFetcherResult was cancelled
//you can handle them here and return DataResponse with corresponding DataErrorEnum and DataStatusEnum
}
}
return result;
}
@Override
public List<DataResponse> get() throws ExecutionException, InterruptedException {
List<DataResponse> result = new ArrayList<>(futures.size());
for (Future<DataResponse> future : futures) {
result.add(future.get());
}
return result;
}
@Override
public List<DataResponse> get(long timeout, TimeUnit timeUnit)
throws ExecutionException, InterruptedException, TimeoutException {
if (latch.await(timeout, timeUnit)) {
return get();
}
throw new TimeoutException();//or getPartialResult()
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = true;
for (Future<DataResponse> future : futures) {
cancelled &= future.cancel(mayInterruptIfRunning);
}
return cancelled;
}
@Override
public boolean isCancelled() {
boolean cancelled = true;
for (Future<DataResponse> future : futures) {
cancelled &= future.isCancelled();
}
return cancelled;
}
@Override
public boolean isDone() {
boolean done = true;
for (Future<DataResponse> future : futures) {
done &= future.isDone();
}
return done;
}
//and etc.
}
Я написал его с помощью CountDownLatch
, и он отлично выглядит, но обратите внимание, что есть нюанс.
Вы можете немного застрять в DataFetcherResult.get(long timeout, TimeUnit timeUnit)
, потому что CountDownLatch
не синхронизируется с будущим состоянием. И может случиться, что latch.getCount() == 0
, но не все фьючерсы будут возвращать future.isDone() == true
одновременно. Поскольку они уже прошли latch.countDown();
внутри finally {}
Callable block, но не изменили внутренний state
, который по-прежнему равен NEW
.
И поэтому вызов get()
внутри get(long timeout, TimeUnit timeUnit)
может вызвать небольшую задержку.
Аналогичный случай был описан здесь.
Получить с таймаутом DataFetcherResult.get(...)
можно переписать с использованием фьючерсов future.get(long timeout, TimeUnit timeUnit)
, и вы можете удалить CountDownLatch
из класса.
public List<DataResponse> get(long timeout, TimeUnit timeUnit)
throws ExecutionException, InterruptedException{
List<DataResponse> result = new ArrayList<>(futures.size());
long timeoutMs = timeUnit.toMillis(timeout);
boolean timeout = false;
for (Future<DataResponse> future : futures) {
long beforeGet = System.currentTimeMillis();
try {
if (!timeout && timeoutMs > 0) {
result.add(future.get(timeoutMs, TimeUnit.MILLISECONDS));
timeoutMs -= System.currentTimeMillis() - beforeGet;
} else {
if (future.isDone()) {
result.add(future.get());
} else {
//result.add(new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR)); ?
}
}
} catch (TimeoutException e) {
result.add(new DataResponse(DataErrorEnum.TIMEOUT, DataStatusEnum.ERROR));
timeout = true;
}
//you can also handle ExecutionException or CancellationException here
}
return result;
}
Этот код был приведен в качестве примера, и он должен быть протестирован перед использованием на производстве, но кажется законным:)