Выполнение параллельного метода из метода вызова

У меня есть библиотека, которая используется клиентом, и они передают объект DataRequest, который имеет userid, timeout и некоторые другие поля в нем. Теперь я использую этот объект DataRequest для создания URL-адреса, а затем я делаю HTTP-вызов с помощью RestTemplate, и моя служба возвращает ответ JSON, который я использую для создания объекта DataResponse и возвращает этот объект DataResponse назад к ним.

Ниже мой класс DataClient, используемый клиентом, передавая ему объект DataRequest. Я использую значение тайм-аута, переданное клиентом в DataRequest, чтобы пропустить запрос, если он занимает слишком много времени в методе getSyncData.

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // first executor
    private ExecutorService service = Executors.newFixedThreadPool(15);

    @Override
    public DataResponse getSyncData(DataRequest key) {
        DataResponse response = null;
        Future<DataResponse> responseFuture = null;

        try {
            responseFuture = getAsyncData(key);
            response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
        } catch (TimeoutException ex) {
            response = new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
            responseFuture.cancel(true);
            // logging exception here               
        }

        return response;
    }   

    @Override
    public Future<DataResponse> getAsyncData(DataRequest key) {
        DataFetcherTask task = new DataFetcherTask(key, restTemplate);
        Future<DataResponse> future = service.submit(task);

        return future;
    }
}

DataFetcherTask класс:

public class DataFetcherTask implements Callable<DataResponse> {

    private DataRequest key;
    private RestTemplate restTemplate;

    public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() throws Exception {
        // In a nutshell below is what I am doing here. 
        // 1. Make an url using DataRequest key.
        // 2. And then execute the url RestTemplate.
        // 3. Make a DataResponse object and return it.

        // I am calling this whole logic in call method as LogicA
    }
}

В настоящее время мой класс DataFetcherTask отвечает за один ключ DataRequest, как показано выше.

Проблема: -

Теперь у меня небольшое изменение дизайна. Клиент передаст объект DataRequest (например, keyA) в мою библиотеку, а затем я сделаю новый HTTP-вызов другой службе (которую я не делаю в своем текущем проекте), используя идентификатор пользователя, присутствующий в DataRequest (keyA) объект, который вернет мне список идентификаторов пользователя, поэтому я буду использовать этот идентификатор пользователя и сделать несколько других DataRequest (keyB, keyC, keyD) объектов по одному для каждого идентификатора пользователя, возвращаемого в ответ. И тогда у меня будет объект List<DataRequest>, который будет иметь объекты keyB, keyC и keyD DataRequest. Максимальным элементом в List<DataRequest> будет три, все.

Теперь для каждого из объектов DataRequest в List<DataRequest> я хочу выполнить выше DataFetcherTask.call метод параллельно, а затем сделать List<DataResponse>, добавив каждый DataResponse для каждого ключа. Поэтому у меня будет три параллельных вызова DataFetcherTask.call. Идея этого параллельного вызова состоит в том, чтобы получить данные для всех этих максимальных трех ключей в одном глобальном значении таймаута.

Итак, мое предложение - DataFetcherTask класс вернет обратно List<DataResponse> объект вместо DataResponse, а затем изменит подпись метода getSyncData и getAsyncData. Итак, вот алгоритм:

  • Использовать объект DataRequest, переданный клиентом, сделать List<DataRequest>, вызвав другую службу HTTP.
  • Сделайте параллельный вызов для каждого метода DataRequest в List<DataRequest> до DataFetcherTask.call и верните объект List<DataResponse> клиенту вместо DataResponse.

Таким образом, я могу применить один и тот же глобальный тайм-аут на шаге 1 вместе с шагом 2. Если какой-либо из вышеперечисленных шагов занимает время, мы просто перейдем к методу getSyncData.

DataFetcherTask после изменения дизайна:

public class DataFetcherTask implements Callable<List<DataResponse>> {

    private DataRequest key;
    private RestTemplate restTemplate;
    // second executor here
    private ExecutorService executorService = Executors.newFixedThreadPool(10);

    public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public List<DataResponse> call() throws Exception {
        List<DataRequest> keys = generateKeys();
        CompletionService<DataResponse> comp = new ExecutorCompletionService<>(executorService);

        int count = 0;
        for (final DataRequest key : keys) {
            comp.submit(new Callable<DataResponse>() {
                @Override
                public DataResponse call() throws Exception {
                    return performDataRequest(key);
                }
            });
        }

        List<DataResponse> responseList = new ArrayList<DataResponse>();
        while (count-- > 0) {
            Future<DataResponse> future = comp.take();
            responseList.add(future.get());
        }
        return responseList;
    }

    // In this method I am making a HTTP call to another service
    // and then I will make List<DataRequest> accordingly.
    private List<DataRequest> generateKeys() {
        List<DataRequest> keys = new ArrayList<>();
        // use key object which is passed in contructor to make HTTP call to another service
        // and then make List of DataRequest object and return keys.
        return keys;
    }       

    private DataResponse performDataRequest(DataRequest key) {
        // This will have all LogicA code here which is shown in my original design.
        // everything as it is same..
    }
}

Теперь мой вопрос -

  • Должно ли это быть таким? Каков правильный дизайн для решения этой проблемы? Я имею в виду, что метод call в другом методе call выглядит странным?
  • Нужно ли иметь двух исполнителей, подобных мне в моем коде? Есть ли лучший способ решить эту проблему или любое изменение упрощения/дизайна, которое мы можем здесь сделать?

Я упростил код, чтобы идея поняла, что я пытаюсь сделать.

Ответы

Ответ 1

Как уже упоминалось в комментариях к вашему вопросу, вы можете использовать фреймворк Java ForkJoin. Это избавит вас от лишнего пула потоков в вашем DataFetcherTask.

Вам просто нужно использовать ForkJoinPool в DataClient и преобразовать DataFetcherTask в RecursiveTask (один из подтипов ForkJoinTask). Это позволяет вам легко выполнять другие подзадачи параллельно.

Итак, после этих изменений ваш код будет выглядеть примерно так:

DataFetcherTask

DataFetcherTask теперь RecursiveTask, который сначала генерирует ключи и вызывает подзадачи для каждого сгенерированного ключа. Эти подзадачи выполняются в том же ForkJoinPool как родительская задача.

public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {

  private final DataRequest key;
  private final RestTemplate restTemplate;

  public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
      this.key = key;
      this.restTemplate = restTemplate;
  }

  @Override
  protected List<DataResponse> compute() {
    // Create subtasks for the key and invoke them
    List<DataRequestTask> requestTasks = requestTasks(generateKeys());
    invokeAll(requestTasks);

    // All tasks are finished if invokeAll() returns.
    List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
    for (DataRequestTask task : requestTasks) {
      try {
        responseList.add(task.get());
      } catch (InterruptedException | ExecutionException e) {
        // TODO - Handle exception properly
        Thread.currentThread().interrupt();
        return Collections.emptyList();
      }
    }

    return responseList;
  }

  private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
    List<DataRequestTask> tasks = new ArrayList<>(keys.size());
    for (DataRequest key : keys) {
      tasks.add(new DataRequestTask(key));
    }

    return tasks;
  }

  // In this method I am making a HTTP call to another service
  // and then I will make List<DataRequest> accordingly.
  private List<DataRequest> generateKeys() {
      List<DataRequest> keys = new ArrayList<>();
      // use key object which is passed in contructor to make HTTP call to another service
      // and then make List of DataRequest object and return keys.
      return keys;
  }

  /** Inner class for the subtasks. */
  private static class DataRequestTask extends RecursiveTask<DataResponse> {

    private final DataRequest request;

    public DataRequestTask(DataRequest request) {
      this.request = request;
    }

    @Override
    protected DataResponse compute() {
      return performDataRequest(this.request);
    }

    private DataResponse performDataRequest(DataRequest key) {
      // This will have all LogicA code here which is shown in my original design.
      // everything as it is same..
      return new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK);
    }
  }

}

DataClient

DataClient не будет сильно изменяться, кроме нового пула потоков:

public class DataClient implements Client {

  private final RestTemplate restTemplate = new RestTemplate();
  // Replace the ExecutorService with a ForkJoinPool
  private final ForkJoinPool service = new ForkJoinPool(15);

  @Override
  public List<DataResponse> getSyncData(DataRequest key) {
      List<DataResponse> responsList = null;
      Future<List<DataResponse>> responseFuture = null;

      try {
          responseFuture = getAsyncData(key);
          responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
      } catch (TimeoutException | ExecutionException | InterruptedException ex) {
          responsList = Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR));
          responseFuture.cancel(true);
          // logging exception here
      }

      return responsList;
  }

  @Override
  public Future<List<DataResponse>> getAsyncData(DataRequest key) {
      DataFetcherTask task = new DataFetcherTask(key, this.restTemplate);
      return this.service.submit(task);
  }
}

Как только вы на Java8, вы можете рассмотреть возможность изменения реализации на CompletableFuture s. Тогда он будет выглядеть примерно так:

DataClientCF

public class DataClientCF {

  private final RestTemplate restTemplate = new RestTemplate();
  private final ExecutorService executor = Executors.newFixedThreadPool(15);

  public List<DataResponse> getData(DataRequest initialKey) {
    return CompletableFuture.supplyAsync(() -> generateKeys(initialKey), this.executor)
      .thenApply(requests -> requests.stream().map(this::supplyRequestAsync).collect(Collectors.toList()))
      .thenApply(responseFutures -> responseFutures.stream().map(future -> future.join()).collect(Collectors.toList()))
      .exceptionally(t -> { throw new RuntimeException(t); })
      .join();
  }

  private List<DataRequest> generateKeys(DataRequest key) {
    return new ArrayList<>();
  }

  private CompletableFuture<DataResponse> supplyRequestAsync(DataRequest key) {
    return CompletableFuture.supplyAsync(() -> new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK), this.executor);
  }
}

Как упоминалось в комментариях, Guava ListenableFuture обеспечит аналогичную функциональность для Java7, но без Lambdas они, как правило, становятся неуклюжими.

Ответ 2

Как я знаю, RestTemplate блокируется, он указан в ForkJoinPool JavaDoc в ForkJoinTask:

Вычисления должны избегать синхронизированных методов или блоков и должны свести к минимуму другую блокирующую синхронизацию, помимо объединения других задач или использования синхронизаторов, таких как Phasers, которые рекламируются для сотрудничества с планированием fork/join....
Задачи также не должны выполнять блокировку IO,...

Входящий вызов является избыточным.
И вам не нужны два исполнителя. Также вы можете вернуть частичный результат в getSyncData(DataRequest key). Это можно сделать так

DataClient.java

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // first executor
    private ExecutorService service = Executors.newFixedThreadPool(15);

    @Override
    public List<DataResponse> getSyncData(DataRequest key) {
        List<DataResponse> responseList = null;
        DataFetcherResult response = null;
        try {
            response = getAsyncData(key);
            responseList = response.get(key.getTimeout(), key.getTimeoutUnit());
        } catch (TimeoutException ex) {
            response.cancel(true);
            responseList = response.getPartialResult();
        }
        return responseList;
    }

    @Override
    public DataFetcherResult getAsyncData(DataRequest key) {
        List<DataRequest> keys = generateKeys(key);
        final List<Future<DataResponse>> responseList = new ArrayList<>();
        final CountDownLatch latch = new CountDownLatch(keys.size());//assume keys is not null
        for (final DataRequest _key : keys) {
            responseList.add(service.submit(new Callable<DataResponse>() {
                @Override
                public DataResponse call() throws Exception {
                    DataResponse response = null;
                    try {
                        response = performDataRequest(_key);
                    } finally {
                        latch.countDown();
                        return response;
                    }
                }
            }));
        }
        return new DataFetcherResult(responseList, latch);
    }

    // In this method I am making a HTTP call to another service
    // and then I will make List<DataRequest> accordingly.
    private List<DataRequest> generateKeys(DataRequest key) {
        List<DataRequest> keys = new ArrayList<>();
        // use key object which is passed in contructor to make HTTP call to another service
        // and then make List of DataRequest object and return keys.
        return keys;
    }

    private DataResponse performDataRequest(DataRequest key) {
        // This will have all LogicA code here which is shown in my original design.
        // everything as it is same..
        return null;
    }
}

DataFetcherResult.java

public class DataFetcherResult implements Future<List<DataResponse>> {
    final List<Future<DataResponse>> futures;
    final CountDownLatch latch;

    public DataFetcherResult(List<Future<DataResponse>> futures, CountDownLatch latch) {
        this.futures = futures;
        this.latch = latch;
    }

    //non-blocking
    public List<DataResponse> getPartialResult() {
        List<DataResponse> result = new ArrayList<>(futures.size());
        for (Future<DataResponse> future : futures) {
            try {
                result.add(future.isDone() ? future.get() : null);
                //instead of null you can return new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
                //ExecutionException or CancellationException could be thrown, especially if DataFetcherResult was cancelled
                //you can handle them here and return DataResponse with corresponding DataErrorEnum and DataStatusEnum
            }
        }
        return result;
    }

    @Override
    public List<DataResponse> get() throws ExecutionException, InterruptedException {
        List<DataResponse> result = new ArrayList<>(futures.size());
        for (Future<DataResponse> future : futures) {
            result.add(future.get());
        }
        return result;
    }

    @Override
    public List<DataResponse> get(long timeout, TimeUnit timeUnit)
            throws ExecutionException, InterruptedException, TimeoutException {
        if (latch.await(timeout, timeUnit)) {
            return get();
        }
        throw new TimeoutException();//or getPartialResult()
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean cancelled = true;
        for (Future<DataResponse> future : futures) {
            cancelled &= future.cancel(mayInterruptIfRunning);
        }
        return cancelled;
    }

    @Override
    public boolean isCancelled() {
        boolean cancelled = true;
        for (Future<DataResponse> future : futures) {
            cancelled &= future.isCancelled();
        }
        return cancelled;
    }

    @Override
    public boolean isDone() {
        boolean done = true;
        for (Future<DataResponse> future : futures) {
            done &= future.isDone();
        }
        return done;
    }

    //and etc.
}

Я написал его с помощью CountDownLatch, и он отлично выглядит, но обратите внимание, что есть нюанс. Вы можете немного застрять в DataFetcherResult.get(long timeout, TimeUnit timeUnit), потому что CountDownLatch не синхронизируется с будущим состоянием. И может случиться, что latch.getCount() == 0, но не все фьючерсы будут возвращать future.isDone() == true одновременно. Поскольку они уже прошли latch.countDown(); внутри finally {} Callable block, но не изменили внутренний state, который по-прежнему равен NEW.
И поэтому вызов get() внутри get(long timeout, TimeUnit timeUnit) может вызвать небольшую задержку.
Аналогичный случай был описан здесь.

Получить с таймаутом DataFetcherResult.get(...) можно переписать с использованием фьючерсов future.get(long timeout, TimeUnit timeUnit), и вы можете удалить CountDownLatch из класса.

public List<DataResponse> get(long timeout, TimeUnit timeUnit)
        throws ExecutionException, InterruptedException{
    List<DataResponse> result = new ArrayList<>(futures.size());
    long timeoutMs = timeUnit.toMillis(timeout);
    boolean timeout = false;
    for (Future<DataResponse> future : futures) {
        long beforeGet = System.currentTimeMillis();
        try {
            if (!timeout && timeoutMs > 0) {
                result.add(future.get(timeoutMs, TimeUnit.MILLISECONDS));
                timeoutMs -= System.currentTimeMillis() - beforeGet;
            } else {
                if (future.isDone()) {
                    result.add(future.get());
                } else {
                    //result.add(new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR)); ?
                }
            }
        } catch (TimeoutException e) {
            result.add(new DataResponse(DataErrorEnum.TIMEOUT, DataStatusEnum.ERROR));
            timeout = true;
        }
        //you can also handle ExecutionException or CancellationException here
    }

    return result;
}

Этот код был приведен в качестве примера, и он должен быть протестирован перед использованием на производстве, но кажется законным:)