Как заставить ThreadPoolExecutor увеличить потоки до max перед очередью?
Некоторое время я был разочарован поведением по умолчанию ThreadPoolExecutor
, которое поддерживает пулы потоков ExecutorService
, которые используют многие из нас. Цитировать из Javadocs:
Если запущено больше потоков corePoolSize, но меньше MaximumPoolSize, новый поток будет создан , только если очередь заполнена.
Это означает, что если вы определите пул потоков с помощью следующего кода, он никогда не запустит 2-й поток, поскольку LinkedBlockingQueue
не ограничен.
ExecutorService threadPool =
new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
Только если у вас ограниченная очередь и очередь заполнена, запускаются все потоки выше номера ядра. Я подозреваю, что многим многопоточным программистам младшего возраста Java не известно о таком поведении ThreadPoolExecutor
.
Теперь у меня есть конкретный случай использования, где это неоптимально. Я ищу способы, без написания своего собственного класса TPE, чтобы обойти это.
Мои требования касаются веб-службы, которая выполняет обратные вызовы, возможно, ненадежной третьей стороне.
- Я не хочу выполнять обратный вызов синхронно с веб-запросом, поэтому я хочу использовать пул потоков.
- Обычно я получаю пару таких минут, поэтому я не хочу иметь
newFixedThreadPool(...)
с большим количеством потоков, которые в основном неактивны.
- Время от времени я получаю всплеск этого трафика и хочу увеличить количество потоков до максимального значения (скажем, 50).
- Мне нужно сделать лучшую попытку выполнить все обратные вызовы, поэтому я хочу поставить в очередь любые дополнительные вызовы выше 50. Я не хочу перегружать остальную часть моего веб-сервера с помощью
newCachedThreadPool()
.
Как можно обойти это ограничение в ThreadPoolExecutor
, где очередь должна быть ограничена и заполнена, прежде чем будет запущено больше потоков? Как я могу заставить его запускать больше потоков перед очередью задач?
Изменить:
@Flavio хорошо описывает использование ThreadPoolExecutor.allowCoreThreadTimeOut(true)
для тайм-аута основных потоков и выхода из него. Я считал это, но я все еще хотел функцию основных потоков. Я не хотел, чтобы количество потоков в пуле упало ниже размера ядра, если это возможно.
Ответы
Ответ 1
Как я могу обойти это ограничение в ThreadPoolExecutor
, где очередь должна быть ограничена и полна, прежде чем запускается больше потоков.
Полагаю, что наконец-то я нашел несколько элегантное (возможно, немного взломанное) решение этого ограничения с помощью ThreadPoolExecutor
. Он включает в себя расширение LinkedBlockingQueue
, чтобы он возвращал false
для queue.offer(...)
, когда уже заданы некоторые задачи. Если текущие потоки не справляются с поставленными задачами, TPE добавит дополнительные потоки. Если пул уже находится в максимальных потоках, тогда будет вызываться RejectedExecutionHandler
. Это обработчик, который затем выполняет put(...)
в очередь.
Конечно, странно писать очередь, где offer(...)
может возвращать false
и put()
никогда не блокирует так, что часть взлома. Но это хорошо работает с использованием TPE очереди, поэтому я не вижу никаких проблем с этим.
Здесь код:
// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
private static final long serialVersionUID = -6903933921423432194L;
@Override
public boolean offer(Runnable e) {
/*
* Offer it to the queue if there is 0 items already queued, else
* return false so the TPE will add another thread. If we return false
* and max threads have been reached then the RejectedExecutionHandler
* will be called which will do the put into the queue.
*/
if (size() == 0) {
return super.offer(e);
} else {
return false;
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
/*
* This does the actual put into the queue. Once the max threads
* have been reached, the tasks will then queue up.
*/
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});
С помощью этого механизма, когда я отправляю задачи в очередь, ThreadPoolExecutor
будет:
- Сначала измените количество потоков до размера ядра (здесь 1).
- Предложите его в очередь. Если очередь пуста, она будет поставлена в очередь для обработки существующими потоками.
- Если очередь имеет уже 1 или более элементов,
offer(...)
вернет false.
- Если возвращается false, увеличьте количество потоков в пуле до тех пор, пока они не достигнут максимального числа (здесь 50).
- Если на макс, то он вызывает
RejectedExecutionHandler
- Затем
RejectedExecutionHandler
помещает задачу в очередь, обрабатываемую первым доступным потоком в порядке FIFO.
Хотя в приведенном выше примере кода очередь не ограничена, вы также можете определить ее как ограниченную очередь. Например, если вы добавите емкость 1000 в LinkedBlockingQueue
, то она будет:
- масштабировать потоки до max
- тогда очередь до тех пор, пока не будет заполнено 1000 задач.
- затем заблокируйте вызывающего абонента, пока пространство не станет доступным для очереди.
Кроме того, если вам действительно нужно использовать offer(...)
в
RejectedExecutionHandler
, тогда вы можете использовать метод offer(E, long, TimeUnit)
вместо Long.MAX_VALUE
в качестве тайм-аута.
Edit:
Я изменил метод my offer(...)
для обратной обратной связи @Ralf. Это только увеличит количество потоков в пуле, если они не будут поддерживать нагрузку.
Edit:
Еще одна настройка этого ответа может состоять в том, чтобы спросить TPE, есть ли простаивающие потоки и только поставить в очередь элемент, если это так. Вам нужно было бы сделать настоящий класс для этого и добавить на него метод ourQueue.setThreadPoolExecutor(tpe);
.
Тогда ваш метод offer(...)
может выглядеть примерно так:
- Убедитесь, что
tpe.getPoolSize() == tpe.getMaximumPoolSize()
в этом случае просто вызывает super.offer(...)
.
- В противном случае, если
tpe.getPoolSize() > tpe.getActiveCount()
, тогда вызовите super.offer(...)
, поскольку там, как представляется, нет простоя.
- В противном случае верните
false
, чтобы разветкить другой поток.
Возможно, это:
int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
return super.offer(e);
} else {
return false;
}
Обратите внимание, что методы get на TPE являются дорогостоящими, так как они обращаются к полям volatile
или (в случае getActiveCount()
) блокируют TPE и просматривают список нитей. Кроме того, здесь есть условия гонки, которые могут привести к неправильной постановке задачи или другой вилке, когда был простаивающий поток.
Ответ 2
У меня уже есть два других ответа на этот вопрос, но я подозреваю, что этот является лучшим.
Он основан на методике принятого в настоящее время ответа, а именно:
- Переопределите метод очереди
offer()
, чтобы (иногда) возвращать false,
- что заставляет
ThreadPoolExecutor
либо создавать новый поток, либо отклонять задачу, и
- установите
RejectedExecutionHandler
для фактической постановки задачи в очередь на отклонение.
Проблема в том, что offer()
должен возвращать false. Принятый в настоящее время ответ возвращает false, когда в очереди есть несколько задач, но, как я уже отмечал в своем комментарии, это вызывает нежелательные последствия. С другой стороны, если вы всегда возвращаете false, вы будете продолжать создавать новые потоки, даже если у вас есть потоки, ожидающие в очереди.
Решением является использование Java 7 LinkedTransferQueue
и вызов offer()
tryTransfer()
. Когда есть ожидающий потребительский поток, задача будет просто передана этому потоку. В противном случае offer()
вернет false, а ThreadPoolExecutor
создаст новый поток.
BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
@Override
public boolean offer(Runnable e) {
return tryTransfer(e);
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
Ответ 3
Установите размер ядра и максимальный размер на одно и то же значение и разрешите удаление основных потоков из пула с помощью allowCoreThreadTimeOut(true)
.
Ответ 4
Примечание: теперь я предпочитаю и рекомендую мой другой ответ.
Здесь версия, которая мне кажется гораздо более простой: увеличьте corePoolSize (вплоть до предела MaximumPoolSize) всякий раз, когда выполняется новая задача, а затем уменьшите corePoolSize (вплоть до предела указанного пользователем размера ядра пула) ) всякий раз, когда задача завершается.
Другими словами, отслеживайте количество запущенных или установленных в очереди задач и убедитесь, что corePoolSize равен количеству задач, пока он находится между указанным пользователем "размером пула пула" и максимальным размером пакета.
public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
private int userSpecifiedCorePoolSize;
private int taskCount;
public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
userSpecifiedCorePoolSize = corePoolSize;
}
@Override
public void execute(Runnable runnable) {
synchronized (this) {
taskCount++;
setCorePoolSizeToTaskCountWithinBounds();
}
super.execute(runnable);
}
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
super.afterExecute(runnable, throwable);
synchronized (this) {
taskCount--;
setCorePoolSizeToTaskCountWithinBounds();
}
}
private void setCorePoolSizeToTaskCountWithinBounds() {
int threads = taskCount;
if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
setCorePoolSize(threads);
}
}
Как написано, класс не поддерживает изменение указанного пользователем corePoolSize или maximumPoolSize после построения и не поддерживает управление рабочей очередью напрямую или через remove()
или purge()
.
Ответ 5
У нас есть подкласс ThreadPoolExecutor
, который принимает дополнительный creationThreshold
и переопределяет execute
.
public void execute(Runnable command) {
super.execute(command);
final int poolSize = getPoolSize();
if (poolSize < getMaximumPoolSize()) {
if (getQueue().size() > creationThreshold) {
synchronized (this) {
setCorePoolSize(poolSize + 1);
setCorePoolSize(poolSize);
}
}
}
}
возможно, это тоже помогает, но ваш взгляд выглядит более вычурным, конечно...
Ответ 6
Рекомендуемый ответ решает только одну (1) проблему с пулом потоков JDK:
Пулы потоков JDK смещены в сторону очередей. Таким образом, вместо создания нового потока, они поставят задачу в очередь. Только если очередь достигнет своего предела, пул потоков создаст новый поток.
Удаление нитей не происходит, когда нагрузка уменьшается. Например, если у нас есть множество заданий, попадающих в пул, из-за которых пул достигает максимума, а затем выполняется легкая загрузка не более 2 задач одновременно, пул будет использовать все потоки для обслуживания малой нагрузки, предотвращая удаление потоков. (понадобятся только 2 темы...)
Недовольный описанным выше поведением, я пошел дальше и создал пул, чтобы преодолеть недостатки, перечисленные выше.
Решить 2) Использование планирования Lifo решает проблему. Эта идея была представлена Беном Маурером на конференции ACM Applicative 2015:
Systems @Facebook Scale
Так родилась новая реализация:
LifoThreadPoolExecutorSQP
Пока что эта реализация улучшает производительность асинхронного выполнения для ZEL.
Реализация способна вращаться, чтобы уменьшить издержки переключения контекста, обеспечивая превосходную производительность для определенных случаев использования.
Надеюсь, это поможет...
PS: JDK Fork Join Pool реализует ExecutorService и работает как "нормальный" пул потоков, Реализация является производительной, Она использует планирование потоков LIFO, однако нет никакого контроля над внутренним размером очереди, тайм-аутом выхода на пенсию..., и что наиболее важно, задачи не могут быть прервано при отмене их
Ответ 7
Примечание: теперь я предпочитаю и рекомендую мой другой ответ.
У меня есть другое предложение, следуя первоначальной идее изменения очереди для возврата false. В этом случае все задачи могут входить в очередь, но всякий раз, когда задача ставится в очередь после execute()
, мы выполняем ее с помощью задачи no-op sentinel, которую очередь отклоняет, вызывая появление нового потока, который будет выполнять no-op за которым сразу следует что-то из очереди.
Поскольку рабочие потоки могут опросить LinkedBlockingQueue
для новой задачи, возможно, что задача может попасть в очередь, даже когда имеется доступный поток. Чтобы избежать появления новых потоков, даже когда есть потоки, нам нужно отслеживать, сколько потоков ожидает новых задач в очереди, и только порождать новый поток, когда в очереди больше задач, чем ожидающих потоков.
final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };
final AtomicInteger waitingThreads = new AtomicInteger(0);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
@Override
public boolean offer(Runnable e) {
// offer returning false will cause the executor to spawn a new thread
if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
else return super.offer(e);
}
@Override
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
try {
waitingThreads.incrementAndGet();
return super.poll(timeout, unit);
} finally {
waitingThreads.decrementAndGet();
}
}
@Override
public Runnable take() throws InterruptedException {
try {
waitingThreads.incrementAndGet();
return super.take();
} finally {
waitingThreads.decrementAndGet();
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
@Override
public void execute(Runnable command) {
super.execute(command);
if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
}
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r == SENTINEL_NO_OP) return;
else throw new RejectedExecutionException();
}
});
Ответ 8
Лучшее решение, о котором я могу думать, это расширить.
ThreadPoolExecutor
предлагает несколько методов hook: beforeExecute
и afterExecute
. В своем расширении вы можете поддерживать ограниченную очередь для подачи задач и вторую неограниченную очередь для обработки переполнения. Когда кто-то вызывает submit
, вы можете попытаться поместить запрос в ограниченную очередь. Если вы встретили исключение, вы просто ставите задачу в очередь переполнения. Затем вы можете использовать крюк afterExecute
, чтобы увидеть, есть ли что-либо в очереди переполнения после завершения задачи. Таким образом, исполнитель сначала позаботится о том, что в нем находится ограниченная очередь, и автоматически выйдет из этой неограниченной очереди, как позволяет время.
Кажется, что больше работы, чем ваше решение, но, по крайней мере, это не связано с непредвиденным поведением очередей. Я также думаю, что есть лучший способ проверить статус очереди и потоков, а не полагаться на исключения, которые довольно медленно бросать.