ExecutorService, стандартный способ избежать слишком большой очереди задач
Я использую ExecutorService для удобства параллельной многопоточной программы. Возьмите следующий код:
while(xxx)
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
...
Future<..> ... = exService.submit(..);
...
}
В моем случае проблема заключается в том, что submit() не блокируется, если заняты все NUMBER_THREADS. Следствием этого является то, что очередь задач наводняется многими задачами. Следствием этого является то, что закрытие службы выполнения с помощью ExecutorService.shutdown() занимает много времени (ExecutorService.isTerminated() будет ложным в течение длительного времени). Причина в том, что очередь задач все еще достаточно полная.
В настоящее время моим обходным решением является работа с семафорами, чтобы запретить иметь много записей внутри очереди задач ExecutorService:
...
Semaphore semaphore=new Semaphore(NUMBER_THREADS);
while(xxx)
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS);
...
semaphore.aquire();
// internally the task calls a finish callback, which invokes semaphore.release()
// -> now another task is added to queue
Future<..> ... = exService.submit(..);
...
}
Я уверен, что есть лучшее более инкапсулированное решение?
Ответы
Ответ 1
Хитрость заключается в использовании фиксированного размера очереди и:
new ThreadPoolExecutor.CallerRunsPolicy()
Я также рекомендую использовать Guava ListeningExecutorService.
Вот пример очередей потребителей/производителей.
private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
return new ThreadPoolExecutor(nThreads, nThreads,
5000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
}
Что-нибудь лучше, и вы можете рассмотреть MQ, например, RabbitMQ или ActiveMQ, поскольку у них есть технология QoS.
Ответ 2
Вы можете ThreadPoolExecutor.getQueue(). size(), чтобы узнать размер очереди ожидания. Вы можете выполнить действие, если очередь слишком длинная. Я предлагаю запустить задачу в текущем потоке, если очередь слишком длинная, чтобы замедлить работу производителя (если это подходит)
Ответ 3
Вам лучше создать ThreadPoolExecutor (что и делает Executors.newXXX()).
В конструкторе вы можете передать BlockingQueue для Исполнителя для использования в качестве своей очереди задач. Если вы передадите размер BlockingQueue с ограничением размера (например, LinkedBlockingQueue), он должен достичь желаемого эффекта.
ExecutorService exService = new ThreadPoolExecutor(NUMBER_THREADS, NUMBER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(workQueueSize));
Ответ 4
Истинная блокировка ThreadPoolExecutor была в списке желаний многих, там даже ошибка JDC открылась на нем.
Я столкнулся с той же проблемой и наткнулся на это:
http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html
Это реализация BlockingThreadPoolExecutor, реализованная с использованием RejectionPolicy, которая использует предложение для добавления задачи в очередь, ожидая, что очередь будет иметь место. Выглядит хорошо.
Ответ 5
вы можете добавить еще одну бляковую очередь с ограниченным размером, чтобы контролировать размер внутренней очереди в executorService, некоторые думают как семафор, но очень легко.
перед исполнителем вы ставите(), и когда задается задача take(). take() должен находиться внутри кода задачи