Как подождать завершения ThreadPoolExecutor
Мой вопрос: Как выполнить кучу поточных объектов на ThreadPoolExecutor
и дождаться их завершения до перехода
Я новичок в ThreadPoolExecutor. Таким образом, этот код является испытанием, чтобы узнать, как он работает. Прямо сейчас я даже не заполняю BlockingQueue
объектами, потому что я не понимаю, как запустить очередь, не вызывая execute()
с другим RunnableObject
. Во всяком случае, прямо сейчас я просто звоню awaitTermination()
, но я думаю, что я все еще что-то пропустил. Любые советы были бы замечательными! Спасибо.
public void testThreadPoolExecutor() throws InterruptedException {
int limit = 20;
BlockingQueue q = new ArrayBlockingQueue(limit);
ThreadPoolExecutor ex = new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) {
ex.execute(new RunnableObject(i + 1));
}
ex.awaitTermination(2, TimeUnit.SECONDS);
System.out.println("finished");
}
Класс RunnableObject:
package playground;
public class RunnableObject implements Runnable {
private final int id;
public RunnableObject(int id) {
this.id = id;
}
@Override
public void run() {
System.out.println("ID: " + id + " started");
try {
Thread.sleep(2354);
} catch (InterruptedException ignore) {
}
System.out.println("ID: " + id + " ended");
}
}
Ответы
Ответ 1
Вы должны зациклиться на awaitTermination
ExecutorService threads;
// ...
// Tell threads to finish off.
threads.shutdown();
// Wait for everything to finish.
while (!threads.awaitTermination(10, TimeUnit.SECONDS)) {
log.info("Awaiting completion of threads.");
}
Ответ 2
Кажется, что проблема заключается в том, что вы не вызываете shutdown
после того, как вы отправили все задания в свой пул. Без shutdown()
ваш awaitTermination
всегда будет возвращать false.
ThreadPoolExecutor ex =
new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) {
ex.execute(new RunnableObject(i + 1));
}
// you are missing this line!!
ex.shutdown();
ex.awaitTermination(2, TimeUnit.SECONDS);
Вы также можете сделать что-то вроде следующего, чтобы дождаться завершения всех ваших заданий:
List<Future<Object>> futures = new ArrayList<Future<Object>>();
for (int i = 0; i < limit; i++) {
futures.add(ex.submit(new RunnableObject(i + 1), (Object)null));
}
for (Future<Object> future : futures) {
// this joins with the submitted job
future.get();
}
...
// still need to shutdown at the end
ex.shutdown();
Кроме того, поскольку вы спали за 2354
миллисекунды, но только для ожидания завершения всех заданий для 2
SECONDS
, awaitTermination
всегда будет возвращать false
.
Наконец, похоже, что вы беспокоитесь о создании нового ThreadPoolExecutor
, и вместо этого вы хотите повторно использовать первый. Не будь. Накладные расходы GC будут крайне минимальными по сравнению с любым кодом, который вы пишете, чтобы определить, завершены ли задания.
Для цитирования из javadocs, ThreadPoolExecutor.shutdown()
:
Инициирует упорядоченное завершение работы, в котором выполняются ранее поставленные задачи, но новые задачи не будут приняты. Вызов не имеет дополнительного эффекта, если он уже выключен.
В методе ThreadPoolExecutor.awaitTermination(...)
он ожидает, пока состояние исполнителя перейдет к TERMINATED
. Но сначала состояние должно перейти в shutdown
, если вызывается shutdown()
или STOP
, если вызывается shutdownNow()
.
Ответ 3
Это не имеет никакого отношения к самому исполнителю. Просто используйте интерфейс java.util.concurrent.ExecutorService.invokeAll(Collection<? extends Callable<T>>)
. Он будет блокироваться до тех пор, пока все Callable
не будут завершены.
Исполнители должны быть долговечными; за пределами жизни группы задач. shutdown
предназначен для завершения и очистки приложения.
Ответ 4
Другой подход - использовать CompletionService, очень полезно, если вам нужно попытаться выполнить любой результат задачи:
//run 3 task at time
final int numParallelThreads = 3;
//I used newFixedThreadPool for convenience but if you need you can use ThreadPoolExecutor
ExecutorService executor = Executors.newFixedThreadPool(numParallelThreads);
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
int numTaskToStart = 15;
for(int i=0; i<numTaskToStart ; i++){
//task class that implements Callable<String> (or something you need)
MyTask mt = new MyTask();
completionService.submit(mt);
}
executor.shutdown(); //it cannot be queued more task
try {
for (int t = 0; t < numTaskToStart ; t++) {
Future<String> f = completionService.take();
String result = f.get();
// ... something to do ...
}
} catch (InterruptedException e) {
//termination of all started tasks (it returns all not started tasks in queue)
executor.shutdownNow();
} catch (ExecutionException e) {
// ... something to catch ...
}
Ответ 5
Здесь вариант с принятым ответом, который обрабатывает повторы, если/при вызове InterruptedException:
executor.shutdown();
boolean isWait = true;
while (isWait)
{
try
{
isWait = !executor.awaitTermination(10, TimeUnit.SECONDS);
if (isWait)
{
log.info("Awaiting completion of bulk callback threads.");
}
} catch (InterruptedException e) {
log.debug("Interruped while awaiting completion of callback threads - trying again...");
}
}
Ответ 6
Попробуйте это,
ThreadPoolExecutor ex =
new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) {
ex.execute(new RunnableObject(i + 1));
}
Линии, которые нужно добавить
ex.shutdown();
ex.awaitTermination(timeout, unit)