Элегантное внедрение индикаторов длины очереди в ExecutorServices
Почему, а почему java.util.concurrent
не предоставляет индикаторы длины очереди для своих ExecutorService
s? Недавно я обнаружил, что делаю что-то вроде этого:
ExecutorService queue = Executors.newSingleThreadExecutor();
AtomicInteger queueLength = new AtomicInteger();
...
public void addTaskToQueue(Runnable runnable) {
if (queueLength.get() < MAX_QUEUE_LENGTH) {
queueLength.incrementAndGet(); // Increment queue when submitting task.
queue.submit(new Runnable() {
public void run() {
runnable.run();
queueLength.decrementAndGet(); // Decrement queue when task done.
}
});
} else {
// Trigger error: too long queue
}
}
Что работает нормально, но... Я думаю, что это действительно должно быть реализовано как часть ExecutorService
. Он немой и ошибка, склонная переносить счетчик, отделенный от фактической очереди, длина которой указывает счетчик (напоминает мне массивы C). Но ExecutorService
получаются с помощью статических методов factory, поэтому нет возможности просто расширить отличный исполнитель одиночного потока и добавить счетчик очереди. Итак, что мне делать:
- Reinvent, уже реализованный в JDK?
- Другое умное решение?
Ответы
Ответ 1
Существует более прямой способ:
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newSingleThreadExecutor();
// add jobs
// ...
int size = executor.getQueue().size();
Хотя вы можете не использовать удобные методы создания Executor, а скорее создать исполнителя непосредственно, чтобы избавиться от приведения, и, таким образом, убедитесь, что исполнитель всегда будет ThreadPoolExecutor
, даже если реализация Executors.newSingleThreadExecutor
изменится когда-нибудь.
ThreadPoolExecutor executor = new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>() );
Это прямое копирование из Executors.newSingleThreadExecutor
в JDK 1.6. LinkedBlockingQueue
, который передается конструктору, на самом деле является тем самым объектом, с которого вы вернетесь из getQueue
.
Ответ 2
Пока вы можете напрямую проверить размер очереди. Другой способ справиться с очередью, которая слишком длинна, - сделать внутреннюю очередь ограниченной.
public static
ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(queueSize, true));
}
Это приведет к RejectedExecutionExceptions (см. this), когда вы превысите предел.
Если вы хотите избежать исключения, вызывающий поток может быть захвачен для выполнения функции. См. этот вопрос SO для решения.
Ссылки