Почему параллельный поток не использует все потоки ForkJoinPool?
Итак, я знаю, что если вы используете parallelStream
без специального ForkJoinPool, он будет использовать ForkJoinPool по умолчанию, который по умолчанию имеет меньше потоков, чем у вас есть.
Итак, как указано здесь (а также в другом ответе этого вопроса), чтобы иметь больше parallelism, вы должны:
отправьте выполнение параллельного потока на свой собственный ForkJoinPool: yourFJP.submit(() → stream.parallel(). forEach (doSomething));
Итак, я сделал это:
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
import com.google.common.collect.Sets;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool forkJoinPool = new ForkJoinPool(1000);
IntStream stream = IntStream.range(0, 999999);
final Set<String> thNames = Collections.synchronizedSet(new HashSet<String>());
forkJoinPool.submit(() -> {
stream.parallel().forEach(n -> {
System.out.println("Processing n: " + n);
try {
Thread.sleep(500);
thNames.add(Thread.currentThread().getName());
System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}).get();
}
}
Я создал набор имен потоков, чтобы узнать, сколько потоков создается, а также зарегистрировано количество активных потоков, которые имеет пул, и оба числа не вырастают больше, чем 16, поэтому это означает, что parallelism здесь не более 16 (почему даже 16?). Если я не использую forkJoinPool, я получаю 4 как parallelism, что соответствует количеству процессоров, которые у меня есть.
Почему это дает мне 16, а не 1000?
Ответы
Ответ 1
Обновление
Первоначально этот ответ был подробным объяснением, в котором утверждается, что ForkJoinPool
применяет противодавление и даже не достигает заданного уровня parallelism, потому что для обработки потока всегда есть свободные рабочие.
Это неверно.
Фактический ответ указан в исходном вопросе, на который он был отмечен как дубликат, - использование пользовательской ForkJoinPool
для обработки потока официально не поддерживается, а при использовании forEach
пул по умолчанию parallelism используется для определить поведение разделителя потока.
Здесь пример того, как при выполнении задач вручную отправляется пользовательский ForkJoinPool
, счетчик активных потоков пула легко достигает уровня parallelism:
for (int i = 0; i < 1_000_000; ++i) {
forkJoinPool.submit(() -> {
try {
Thread.sleep(1);
thNames.add(Thread.currentThread().getName());
System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount() + " parallelism: " + forkJoinPool.getParallelism());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
Благодаря Stuart Marks для указания этого и Sotirios Delimanolis, утверждая, что мой первоначальный ответ неверен:)
Ответ 2
Мне кажется, что когда вы отправляете лямбду FJP, лямбда будет использовать общий пул, а не FJP. Сотириос Делиманолис доказал это своим замечанием выше. То, что вы отправляете, - это задача, которая выполняется в вашем FJP.
Попробуйте профилировать этот код, чтобы увидеть, какие потоки фактически используются.
Вы не можете называть потоки внутри FJP.