Понимание Java FixedThreadPool

Я пытаюсь понять, как работает Java FixedThreadPool на практике, но документы не отвечают на мой вопрос.

Предположите простой сценарий, например:

ExecutorService ES= Executors.newFixedThreadPool(3);
List<Future> FL;
for(int i=1;i<=200;i++){
   FL.add(ES.submit(new Task()));
}
ES.shutdown();

где Task - это Callable, которые строят некоторые ресурсы, используют их и возвращают некоторый вывод.

Мой вопрос: сколько Task есть в памяти после завершения цикла for? Другими словами: будет ли только 3 Task в то время, когда они создадут свои ресурсы, или все они созданы заранее, так что после .submit у меня есть 200 Task (и их ресурсы), ожидающие выполнения?

Примечание: создание ресурсов происходит в конструкторе Task, а не в методе call().

В javadoc (не стесняйтесь пропустить следующее): меня смущает следующее объяснение в документах Java

Создает пул потоков, который повторно использует фиксированное число потоков, работающих от общей неограниченной очереди. В любой момент, не более nThreads потоков будут активными задачами обработки.

Я полагаю, это означает, что в моем примере все 200 задач находятся в очереди, но только три из них выполняются в любое время.

Любая помощь приветствуется.

Ответы

Ответ 1

Ваш код эквивалентен

for (int i = 1; i <= 200; i++){
    Task t = new Task();
    FL.add(ES.submit(t));
}

И после цикла for конструктор Task, таким образом, был вызван 200 раз, а содержавшийся в нем код был таким образом выполнен 200 раз. Независимо от того, передана ли задача исполнителю или нет, это не имеет значения: вы вызываете конструктор 200 раз в цикле, а после каждая задача была построена, она подчиняется исполнителю. Исполнитель не тот, кто вызывает конструктор задачи.

Ответ 2

Задачи будут удаляться один за другим из очереди, так как выполнение продолжается, задачи будут удалены, и только результат их будет сохранен в этих объектах Future.

Итак, в основном в памяти:

3 Темы
200 → 0 Задача
0 → 200 Будущее
(с каждым выполненным заданием)

Ответ 3

Вы создаете 200 объектов с помощью new Task(), и эти задачи передаются исполнителю. Исполнители ссылаются на этот объект Task. Итак, если в конструкторе Task вы создаете и удерживаете ресурсы, тогда все 200 задач будут содержать ресурсы.

Если возможно, вы можете создать и использовать ресурс в методе вызова Task, если вы не хотите, чтобы 200 экземпляров создавали и удерживали ресурсы. В этом случае только 3 Task за раз создадут и удерживают ресурс.

Ответ 4

Все 200 задач создаются и потребляют ресурсы, и все они находятся в очереди.

Пул потоков, только вызывает их метод run()/call() один за другим, когда свободный поток доступен для выполнения.

Ответ 5

Чтобы понять это, вам нужно будет увидеть, что происходит, когда вы отправляете задачу Исполнителю в цикле. Сначала мы просто посмотрим на представление одной задачи Исполнителю. Теперь я буду ссылаться на исходный код JDK 1.7.0_51

Метод static Executor.newFixedThreadPool возвращает ThreadPoolExecutor, содержащий блокирующую очередь для хранения задачи

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

В тот момент, когда вы добавляете задачу к этому Исполнителю, это переходит к методу отправки ThreadPoolExecutor, расширяющему AbstractExecutorService, где записана реализация метода отправки.

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

Метод execute является специфичным для реализации (это означает, что различные типы Executor реализуют его разными способами)

Теперь идет настоящее мясо. Это метод выполнения, определенный в ThreadPoolExecutor. Особенно обратите внимание на комментарии. Здесь вступают в игру несколько параметров конфигурации ThreadPoolExecutor, таких как corePoolSize.

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }