Как дождаться завершения всех задач в ThreadPoolExecutor, не закрывая Executor?
Я не могу использовать shutdown()
и awaitTermination()
, потому что возможно, что новые задачи будут добавлены в ThreadPoolExecutor во время ожидания.
Итак, я ищу способ подождать, пока ThreadPoolExecutor освободит его очередь и завершит все его задачи, не останавливая добавление новых задач до этой точки.
Если это имеет значение, это для Android.
Спасибо
Обновление. Через несколько недель после повторного просмотра я обнаружил, что модифицированный CountDownLatch работал лучше для меня в этом случае. Я сохраню ответ, потому что он больше подходит для того, что я спросил.
Ответы
Ответ 1
Если вам интересно узнать, когда завершена определенная задача или определенная партия задач, вы можете использовать ExecutorService.submit(Runnable)
. Вызов этого метода возвращает объект Future
, который может быть помещен в Collection
, который ваш основной поток будет повторять по вызову Future.get()
для каждого из них. Это приведет к остановке основного потока, пока ExecutorService
не обработает все задачи Runnable
.
Collection<Future<?>> futures = new LinkedList<Future<?>>();
futures.add(executorService.submit(myRunnable));
for (Future<?> future:futures) {
future.get();
}
Ответ 2
Мой сценарий - это веб-искатель для получения некоторой информации с веб-сайта, а затем обработки. ThreadPoolExecutor используется для ускорения процесса, потому что во время загрузки может быть загружено много страниц. Таким образом, новые задачи будут созданы в существующей задаче, потому что искатель будет следовать за гиперссылками на каждой странице. Проблема такая же: основной поток не знает, когда все задачи завершены, и он может начать обрабатывать результат. Я использую простой способ определить это. Это не очень элегантно, но работает в моем случае:
while (executor.getTaskCount()!=executor.getCompletedTaskCount()){
System.err.println("count="+executor.getTaskCount()+","+executor.getCompletedTaskCount());
Thread.sleep(5000);
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
Ответ 3
Возможно, вы ищете CompletionService для управления партиями задачи, см. также этот ответ.
Ответ 4
(Это попытка воспроизвести Thilo ранее, удаленный ответ с моими собственными настройками.)
Я думаю, вам, возможно, потребуется уточнить ваш вопрос, поскольку существует неявное бесконечное условие... в какой-то момент вам нужно решить закрыть своего исполнителя, и в этот момент он больше не примет никаких задач. Ваш вопрос, кажется, подразумевает, что вы хотите подождать, пока не узнаете, что дальнейшие задачи не будут отправлены, что вы можете знать только в своем собственном коде приложения.
Следующий ответ позволит вам плавно перейти на новый TPE (по какой-либо причине), заполнить все текущие задачи и не отказываться от новых задач для нового TPE. Это может ответить на ваш вопрос. @Thilo тоже может.
Предполагая, что вы определили где-то видимое TPE, используемое как таковое:
AtomicReference<ThreadPoolExecutor> publiclyAvailableTPE = ...;
Затем вы можете записать процедуру подкачки TPE как таковую. Его также можно записать с использованием синхронизированного метода, но я думаю, что это проще:
void rotateTPE()
{
ThreadPoolExecutor newTPE = createNewTPE();
// atomic swap with publicly-visible TPE
ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(newTPE);
oldTPE.shutdown();
// and if you want this method to block awaiting completion of old tasks in
// the previously visible TPE
oldTPE.awaitTermination();
}
В качестве альтернативы, если вы действительно не шутите, хотите убить пул потоков, то ваша сторона-разработчик должна будет в какой-то момент решить отклоненные задачи, и вы можете использовать null
для нового TPE:
void killTPE()
{
ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(null);
oldTPE.shutdown();
// and if you want this method to block awaiting completion of old tasks in
// the previously visible TPE
oldTPE.awaitTermination();
}
Что может вызвать проблемы с восходящим потоком, вызывающему нужно будет знать, что делать с null
.
Вы также можете поменять местами с помощью фиктивного TPE, который просто отклонил каждое новое исполнение, но это эквивалентно тому, что происходит, если вы вызываете shutdown()
в TPE.
Ответ 5
Если вы не хотите использовать shutdown
, выполните следующие подходы:
invokeAll()
в службе-исполнителе также достигает той же цели CountDownLatch
Связанный вопрос SE:
Как ждать завершения нескольких потоков?
Ответ 6
Вы можете вызвать waitTillDone() в классе Runner:
Runner runner = Runner.runner(10);
runner.runIn(2, SECONDS, runnable);
runner.run(runnable); // each of this runnables could submit more tasks
runner.waitTillDone(); // blocks until all tasks are finished (or failed)
// and now reuse it
runner.runIn(500, MILLISECONDS, callable);
runner.waitTillDone();
runner.shutdown();
Чтобы использовать его, добавьте эту зависимость gradle/maven к вашему проекту: 'com.github.matejtymes:javafixes:1.0'
Подробнее см. здесь https://github.com/MatejTymes/JavaFixes или здесь: http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html
Ответ 7
Попробуйте использовать размер очереди и количество активных задач, как показано ниже
while (executor.getThreadPoolExecutor().getActiveCount() != 0 || !executor.getThreadPoolExecutor().getQueue().isEmpty()){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}