Каков наилучший/самый элегантный способ ограничить количество параллельных вычислений (например, с помощью fixedThreadPool) в параллельных потоках
Предположим, что лямбда-выражение потребляет определенное количество ресурса (например, памяти), которое ограничено, и требует ограничить количество одновременных исполнений (пример: если лямбда временно потребляет 100 МБ (локальной памяти), и нам нравится ограничивать это до 1 ГБ, мы не допускаем более 10 параллельных оценок).
Каков наилучший способ ограничить количество одновременных действий, например, например, в
IntStream.range(0, numberOfJobs).parallel().foreach( i -> { /*...*/ });
?
Примечание. Очевидным вариантом является выполнение вложенности типа
double jobsPerThread = (double)numberOfJobs / numberOfThreads;
IntStream.range(0, numberOfThreads).parallel().forEach( threadIndex ->
IntStream.range((int)(threadIndex * jobsPerThread), (int)((threadIndex+1) * jobsPerThread)).sequential().forEach( i -> { /*...*/ }));
Это единственный способ? Это не так элегантно. На самом деле я хотел бы иметь
IntStream.range(0, numberOfJobs).parallel(numberOfThreads).foreach( i -> { /*...*/ });
Ответы
Ответ 1
Stream
используйте ForkJoinPool
для параллельных операций. По умолчанию они используют ForkJoinPool.commonPool()
, который впоследствии не позволяет изменять concurrency. Однако вы можете использовать свой собственный экземпляр ForkJoinPool
. Когда вы выполняете код потока в контексте вашего собственного ForkJoinPool
, этот пул контекстов будет использоваться для операций потока. Следующий пример иллюстрирует это, выполняя одну и ту же операцию один раз с использованием поведения по умолчанию и один раз используя собственный пул с фиксированным concurrency из 2
:
import java.util.HashSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
public class InterfaceStaticMethod {
public static void main(String[] arg) throws Exception {
Runnable parallelCode=() -> {
HashSet<String> allThreads=new HashSet<>();
IntStream.range(0, 1_000_000).parallel().filter(i->{
allThreads.add(Thread.currentThread().getName()); return false;}
).min();
System.out.println("executed by "+allThreads);
};
System.out.println("default behavior: ");
parallelCode.run();
System.out.println("specialized pool:");
ForkJoinPool pool=new ForkJoinPool(2);
pool.submit(parallelCode).get();
}
}
Ответ 2
В зависимости от вашего используемого варианта использование методов утилиты CompletableFuture
может быть проще:
import static java.util.concurrent.CompletableFuture.runAsync;
ExecutorService executor = Executors.newFixedThreadPool(10); //max 10 threads
for (int i = 0; i < numberOfJobs; i++) {
runAsync(() -> /* do something with i */, executor);
}
//or with a stream:
IntStream.range(0, numberOfJobs)
.forEach(i -> runAsync(() -> /* do something with i */, executor));
Основное отличие от вашего кода состоит в том, что параллельный forEach
будет возвращаться только после завершения последнего задания, тогда как runAsync
вернется, как только все задания будут отправлены. Существуют различные способы изменения этого поведения, если это необходимо.