Понимание Java FixedThreadPool
Я пытаюсь понять, как работает Java FixedThreadPool на практике, но документы не отвечают на мой вопрос.
Предположите простой сценарий, например:
ExecutorService ES= Executors.newFixedThreadPool(3);
List<Future> FL;
for(int i=1;i<=200;i++){
FL.add(ES.submit(new Task()));
}
ES.shutdown();
где Task
- это Callable
, которые строят некоторые ресурсы, используют их и возвращают некоторый вывод.
Мой вопрос: сколько Task
есть в памяти после завершения цикла for
? Другими словами: будет ли только 3 Task
в то время, когда они создадут свои ресурсы, или все они созданы заранее, так что после .submit
у меня есть 200 Task
(и их ресурсы), ожидающие выполнения?
Примечание: создание ресурсов происходит в конструкторе Task
, а не в методе call()
.
В javadoc (не стесняйтесь пропустить следующее): меня смущает следующее объяснение в документах Java
Создает пул потоков, который повторно использует фиксированное число потоков, работающих от общей неограниченной очереди. В любой момент, не более nThreads потоков будут активными задачами обработки.
Я полагаю, это означает, что в моем примере все 200 задач находятся в очереди, но только три из них выполняются в любое время.
Любая помощь приветствуется.
Ответы
Ответ 1
Ваш код эквивалентен
for (int i = 1; i <= 200; i++){
Task t = new Task();
FL.add(ES.submit(t));
}
И после цикла for конструктор Task, таким образом, был вызван 200 раз, а содержавшийся в нем код был таким образом выполнен 200 раз. Независимо от того, передана ли задача исполнителю или нет, это не имеет значения: вы вызываете конструктор 200 раз в цикле, а после каждая задача была построена, она подчиняется исполнителю. Исполнитель не тот, кто вызывает конструктор задачи.
Ответ 2
Задачи будут удаляться один за другим из очереди, так как выполнение продолжается, задачи будут удалены, и только результат их будет сохранен в этих объектах Future.
Итак, в основном в памяти:
3 Темы
200 → 0 Задача
0 → 200 Будущее
(с каждым выполненным заданием)
Ответ 3
Вы создаете 200 объектов с помощью new Task()
, и эти задачи передаются исполнителю. Исполнители ссылаются на этот объект Task
.
Итак, если в конструкторе Task
вы создаете и удерживаете ресурсы, тогда все 200 задач будут содержать ресурсы.
Если возможно, вы можете создать и использовать ресурс в методе вызова Task
, если вы не хотите, чтобы 200 экземпляров создавали и удерживали ресурсы. В этом случае только 3 Task
за раз создадут и удерживают ресурс.
Ответ 4
Все 200 задач создаются и потребляют ресурсы, и все они находятся в очереди.
Пул потоков, только вызывает их метод run()/call() один за другим, когда свободный поток доступен для выполнения.
Ответ 5
Чтобы понять это, вам нужно будет увидеть, что происходит, когда вы отправляете задачу Исполнителю в цикле.
Сначала мы просто посмотрим на представление одной задачи Исполнителю. Теперь я буду ссылаться на исходный код JDK 1.7.0_51
Метод static Executor.newFixedThreadPool
возвращает ThreadPoolExecutor
, содержащий блокирующую очередь для хранения задачи
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
В тот момент, когда вы добавляете задачу к этому Исполнителю, это переходит к методу отправки ThreadPoolExecutor
, расширяющему AbstractExecutorService
, где записана реализация метода отправки.
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
Метод execute является специфичным для реализации (это означает, что различные типы Executor реализуют его разными способами)
Теперь идет настоящее мясо. Это метод выполнения, определенный в ThreadPoolExecutor
. Особенно обратите внимание на комментарии.
Здесь вступают в игру несколько параметров конфигурации ThreadPoolExecutor, таких как corePoolSize
.
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}