Java не использует все доступные процессоры

У меня длинный расчет, который мне нужно выполнить для длинного списка входов. Расчеты независимы, поэтому я хотел бы распространять их на несколько процессоров. Я использую Java 8.

Скелет кода выглядит так:

ExecutorService executorService = Executors.newFixedThreadPool(numThreads);

MyService myService = new MyService(executorService);

List<MyResult> results =
            myInputList.stream()
                     .map(myService::getResultFuture)
                     .map(CompletableFuture::join)
                     .collect(Collectors.toList());

executorService.shutdown();

Основная функция, отвечающая за расчет, выглядит следующим образом:

CompletableFuture<MyResult> getResultFuture(MyInput input) {
    return CompletableFuture.supplyAsync(() -> longCalc(input), executor)))
}

Долгосрочный расчет является безстоящим и не выполняет никаких операций ввода-вывода.

Я бы ожидал, что этот код будет использовать все доступные процессоры, но этого не произойдет. Например, на машине с 72 процессорами и numThreads=72 (или даже, например, numThreads=500) использование процессора не более 500-1000%, как показано на htop:

htop

В соответствии с дампом потока многие из потоков вычислений ждут, например.:

"pool-1-thread-34" #55 prio=5 os_prio=0 tid=0x00007fe858597890 nid=0xd66 waiting on condition [0x00007fe7f9cdd000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000381815f20> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
    - None

Все потоки вычислений ожидали того же замка. Во время сброса только 5 потоков вычислений были RUNNABLE, остальные были ОЖИДАЕМЫ.

Что может быть причиной блокировок и почему мне не удается использовать все процессоры?

Ответы

Ответ 1

Вы отправляете задания и вызываете join() сразу после этого, ожидая завершения асинхронного задания.

Поточные промежуточные шаги выполняются по элементам, что означает, что промежуточный шаг .map(CompletableFuture::join) выполняется по одному элементу за раз (что еще хуже, поскольку это последовательный поток), не убедившись, что все элементы прошли через представление шаг. Это заставляет поток блокироваться, ожидая завершения каждого отдельного вычисления.

Вы должны обеспечить подачу всех заданий, прежде чем начинать называть join():

List<MyResult> results =
    myInputList.stream()
               .map(myService::getResultFuture)
               .collect(Collectors.toList()).stream()
               .map(CompletableFuture::join)
               .collect(Collectors.toList());

Если вы можете выразить все, что хотите сделать с списком results как действие, которое будет вызываться, когда все будет сделано, вы можете реализовать операцию таким образом, чтобы не блокировать потоки с помощью join():

List<CompletableFuture<MyResult>> futures = myInputList.stream()
    .map(myService::getResultFuture)
    .collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(CompletableFuture<?>[]::new))
    .thenRun(() -> {
        List<MyResult> results = futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
        // perform action with results
    });

Он по-прежнему вызывает join() для получения результата, но на этом этапе все фьючерсы завершены, поэтому вызывающий объект не будет заблокирован.