Политика ThreadPoolExecutor
Я пытаюсь использовать ThreadPoolExecutor для планирования задач, но в некоторых проблемах с его политиками. Здесь его заявленное поведение:
- Если работает меньше потоков corePoolSize, Executor всегда предпочитает добавлять новый поток, а не выполнять очередность.
- Если запускается corePoolSize или больше потоков, Executor всегда предпочитает размещать запрос, а не добавлять новый поток.
- Если запрос не может быть поставлен в очередь, создается новый поток, если это не будет превышать maximumPoolSize, и в этом случае задача будет отклонена.
Поведение, которое я хочу, следующее:
- как указано выше
- Если работает больше, чем corePoolSize, но меньше потоков MaximumPoolSize, предпочитает добавлять новый поток в очередь и использовать простаивающий поток для добавления нового потока.
- как указано выше
В принципе, я не хочу, чтобы какие-либо задачи были отклонены; Я хочу, чтобы они были поставлены в очередь в неограниченной очереди. Но я хочу иметь до максимум потоковPoolSize. Если я использую неограниченную очередь, она никогда не генерирует потоки после того, как она ударит coreSize. Если я использую ограниченную очередь, он отклоняет задачи. Есть ли способ обойти это?
Теперь я думаю о запуске ThreadPoolExecutor на SynchronousQueue, но не напрямую направляет задачи на него, а вместо этого направляет их в отдельный неограниченный LinkedBlockingQueue. Затем другой поток подается из LinkedBlockingQueue в Executor, и если он отклоняется, он просто пытается снова, пока он не будет отклонен. Это похоже на боль и немного взлома, однако - есть ли более чистый способ сделать это?
Ответы
Ответ 1
Ваш случай использования является общим, полностью законным и, к сожалению, более сложным, чем можно было бы ожидать. Для получения дополнительной информации вы можете прочитать это обсуждение и найти указатель на решение (также упоминаемое в потоке) здесь. Решение Shay работает нормально.
Как правило, я буду немного опасаться неограниченных очередей; обычно лучше иметь явное управление входящим потоком, которое изящно деградирует и регулирует отношение текущей/оставшейся работы, чтобы не перегружать ни производителя, ни потребителя.
Ответ 2
Возможно, нет необходимости микроконтролировать пул потоков как запрошенный.
Пул кэшированных потоков будет использовать неиспользуемые потоки, а также потенциально неограниченные параллельные потоки. Это, конечно же, может привести к ухудшению производительности от сбоев контекста в периоды прерывания.
Executors.newCachedThreadPool();
Лучшим вариантом является установка ограничения на общее количество потоков, в то время как отбрасывание понятия обеспечения простаивающих потоков используется в первую очередь. Изменения конфигурации:
corePoolSize = maximumPoolSize = N;
allowCoreThreadTimeOut(true);
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS);
Рассуждение по этому сценарию, если исполнитель имеет меньше чем corePoolSize
потоков, чем он не должен быть очень занят. Если система не очень занята, тогда нет никакого вреда в разворачивании нового потока. Это приведет к тому, что ваш ThreadPoolExecutor
всегда будет создавать нового работника, если он находится под максимальным количеством разрешенных рабочих. Только тогда, когда максимальное число рабочих "запущено", работникам, ожидающим без дела для задач, будут заданы задачи. Если работник ждет aReasonableTimeDuration
без задачи, то ему разрешается завершить работу. Используя разумные пределы для размера пула (в конце концов, есть только так много процессоров) и достаточно большой тайм-аут (чтобы потоки не нуждались в завершении), вероятно, будут видны желаемые преимущества.
Последний вариант - хакерский. В основном, ThreadPoolExecutor
внутренне использует BlockingQueue.offer
, чтобы определить, имеет ли очередь емкость. Пользовательская реализация BlockingQueue
всегда может отклонить попытку offer
. Когда ThreadPoolExecutor
не выполняет offer
задачу в очереди, она попытается создать нового рабочего. Если новый рабочий не может быть создан, будет вызываться RejectedExecutionHandler
. В этот момент пользовательский RejectedExecutionHandler
может заставить put
добавить пользовательский BlockingQueue
.
/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler {
BlockingQueue<T> delegate;
public boolean offer(T item) {
return false;
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
delegate.put(r);
}
//.... delegate methods
}
Ответ 3
Просто установите corePoolsize = maximumPoolSize
и используйте неограниченную очередь?
В вашем списке точек 1 исключает 2, так как corePoolSize
всегда будет меньше или равно maximumPoolSize
.
Edit
Есть еще что-то несовместимое между тем, что вы хотите, и тем, что TPE предложит вам.
Если у вас есть неограниченная очередь, maximumPoolSize
игнорируется, поэтому, как вы заметили, будут созданы и используются не более corePoolSize
потоки.
Итак, опять же, если вы берете corePoolsize = maximumPoolSize
с неограниченной очередью, у вас есть то, что вы хотите, нет?
Ответ 4
Будете ли вы искать нечто вроде кэшированного пула потоков?
http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool()