Java ExecutorService: waitTermination всех рекурсивно созданных задач
Я использую ExecutorService
для выполнения задачи. Эта задача может рекурсивно создавать другие задачи, которые передаются в один и тот же ExecutorService
, и эти дочерние задачи тоже могут это сделать.
У меня теперь есть проблема, что я хочу подождать, пока все задачи не будут выполнены (то есть все задачи закончены, и они не отправят новые), прежде чем продолжить.
Я не могу вызвать ExecutorService.shutdown()
в основном потоке, потому что это предотвращает принятие новых задач с помощью ExecutorService
.
И Вызов ExecutorService.awaitTermination()
кажется, ничего не делает, если shutdown
не был вызван.
Таким образом, я немного застрял здесь. Это не может быть так трудно для ExecutorService
видеть, что все рабочие бездействуют, не так ли? Единственное неэффективное решение, которое я мог бы придумать, - это прямое использование ThreadPoolExecutor
и запрос его getPoolSize()
каждый раз в то время. Неужели нет лучшего способа сделать это?
Ответы
Ответ 1
Если количество задач в дереве рекурсивных задач изначально неизвестно, возможно, самым простым способом было бы реализовать свой собственный примитив синхронизации, какой-то "обратный семафор" и поделиться им между вашими задачами. Перед отправкой каждой задачи вы увеличиваете значение, когда задача завершается, она уменьшает это значение, и вы ждете, пока значение не будет 0.
Реализация его как отдельного примитива, явно вызванного из задач, отделяет эту логику от реализации пула потоков и позволяет вам представить несколько независимых деревьев рекурсивных задач в один и тот же пул.
Что-то вроде этого:
public class InverseSemaphore {
private int value = 0;
private Object lock = new Object();
public void beforeSubmit() {
synchronized(lock) {
value++;
}
}
public void taskCompleted() {
synchronized(lock) {
value--;
if (value == 0) lock.notifyAll();
}
}
public void awaitCompletion() throws InterruptedException {
synchronized(lock) {
while (value > 0) lock.wait();
}
}
}
Обратите внимание, что taskCompleted()
следует вызывать внутри блока finally
, чтобы он не зависел от возможных исключений.
Также обратите внимание, что beforeSubmit()
должен вызываться подающим потоком перед тем, как задача будет отправлена, а не самой задачей, чтобы избежать возможного "ложного завершения", когда старые задачи завершены, а новые еще не запущены.
EDIT: Важная проблема с фиксированным шаблоном использования.
Ответ 2
Это действительно идеальный кандидат на Phaser. Java 7 выходит с этим новым классом. Его гибкий CountdonwLatch/CyclicBarrier. Вы можете получить стабильную версию на JSR 166 Interest Site.
Способ, которым он является более гибким CountdownLatch/CyclicBarrier, заключается в том, что он способен не только поддерживать неизвестное количество сторон (потоков), но и его повторно использовать (то есть, где входит фазовая часть)
Для каждой заданной вами задачи вы должны зарегистрироваться, когда эта задача будет завершена, вы приедете. Это можно сделать рекурсивно.
Phaser phaser = new Phaser();
ExecutorService e = //
Runnable recursiveRunnable = new Runnable(){
public void run(){
//do work recursively if you have to
if(shouldBeRecursive){
phaser.register();
e.submit(recursiveRunnable);
}
phaser.arrive();
}
}
public void doWork(){
int phase = phaser.getPhase();
phaser.register();
e.submit(recursiveRunnable);
phaser.awaitAdvance(phase);
}
Изменить: Спасибо @depthofreality за указание условия гонки в моем предыдущем примере. Я обновляю его так, чтобы исполняемый поток только ожидал перехода к текущей фазе, когда он блокирует выполнение рекурсивной функции.
Номер фазы не будет отключен до номера arrive
== register
s. Поскольку перед каждым рекурсивным вызовом вызывается register
, инкремент фазы будет выполняться, когда все вызовы завершены.
Ответ 3
Ничего себе, вы, ребята, быстро:)
Спасибо за все предложения. Фьючерсы нелегко интегрируются с моей моделью, потому что я не знаю, сколько runnables запланировано заранее. Поэтому, если я оставлю родительскую задачу в ожидании завершения рекурсивных дочерних задач, у меня много мусора.
Я решил свою проблему, используя предложение AtomicInteger. По сути, я подклассифицировал ThreadPoolExecutor и увеличил счетчик на вызовы execute() и уменьшил на вызовы afterExecute(). Когда счетчик получает 0, я вызываю shutdown(). Это, похоже, работает на мои проблемы, не уверен, что это вообще хороший способ сделать это. В частности, я предполагаю, что вы используете только execute() для добавления Runnables.
В качестве стороны node: я сначала попытался проверить afterExecute() количество Runnables в очереди и число рабочих, которые активны и завершены, когда они равны 0; но это не сработало, потому что не все Runnables появились в очереди, и getActiveCount() не делал того, что ожидал.
Во всяком случае, здесь мое решение: (если кто-то найдет серьезные проблемы с этим, сообщите мне:)
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
private final AtomicInteger executing = new AtomicInteger(0);
public MyThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime,
TimeUnit seconds, BlockingQueue<Runnable> queue) {
super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue);
}
@Override
public void execute(Runnable command) {
//intercepting beforeExecute is too late!
//execute() is called in the parent thread before it terminates
executing.incrementAndGet();
super.execute(command);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
int count = executing.decrementAndGet();
if(count == 0) {
this.shutdown();
}
}
}
Ответ 4
Вы можете создать свой собственный пул потоков, который расширяет ThreadPoolExecutor. Вы хотите знать, когда была отправлена задача и когда она завершается.
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
private int counter = 0;
public MyThreadPoolExecutor() {
super(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
}
@Override
public synchronized void execute(Runnable command) {
counter++;
super.execute(command);
}
@Override
protected synchronized void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
counter--;
notifyAll();
}
public synchronized void waitForExecuted() throws InterruptedException {
while (counter == 0)
wait();
}
}
Ответ 5
Используйте Future для своих задач (вместо отправки Runnable
), обратный вызов обновляет его, когда он будет завершен, поэтому вы можете использовать Future.isDone, чтобы отслеживать состояние всех ваших задач.
Ответ 6
Используйте CountDownLatch.
Передайте объект CountDownLatch для каждой из ваших задач и запрограммируйте свои задачи, как показано ниже.
public void doTask() {
// do your task
latch.countDown();
}
В то время как поток, который должен ждать, должен выполнить следующий код:
public void doWait() {
latch.await();
}
Но, конечно, это предполагает, что вы уже знаете количество дочерних задач, чтобы вы могли инициализировать счет защелки.
Ответ 7
(mea culpa: его "бит" мимо моего сна;), но здесь первая попытка динамической защелки):
package oss.alphazero.sto4958330;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class DynamicCountDownLatch {
@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {
private final CountDownLatch toplatch;
public Sync() {
setState(0);
this.toplatch = new CountDownLatch(1);
}
@Override
protected int tryAcquireShared(int acquires){
try {
toplatch.await();
}
catch (InterruptedException e) {
throw new RuntimeException("Interrupted", e);
}
return getState() == 0 ? 1 : -1;
}
public boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
public boolean tryExtendState(int acquires) {
for (;;) {
int s = getState();
int exts = s+1;
if (compareAndSetState(s, exts)) {
toplatch.countDown();
return exts > 0;
}
}
}
}
private final Sync sync;
public DynamicCountDownLatch(){
this.sync = new Sync();
}
public void await()
throws InterruptedException
{
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException
{
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
public void join() {
sync.tryExtendState(1);
}
}
Эта защелка вводит новый метод join() в существующий (клонированный) API CountDownLatch, который используется задачами, чтобы сигнализировать о входе в большую группу задач.
Защелка проходит от родительской задачи к дочерней задаче. Каждая задача должна была бы по одному шаблону Suraj сначала "join()" защелку, выполнить свою задачу(), а затем countDown().
Чтобы устранить ситуации, когда основной поток запускает группу задач, а затем сразу ждет() - до того, как какой-либо из потоков задач имел возможность даже присоединиться() - используется topLatch
int inner Sync
класс. Это защелка, которая будет подсчитываться по каждому соединению(); Конечно, только первый обратный отсчет является значительным, так как все последующие - nops.
В начальной реализации выше вводится семантическая морщина, потому что tryAcquiredShared (int) не должен бросать InterruptedException, но тогда нам нужно иметь дело с прерыванием в ожидании на topLatch.
Это улучшение по сравнению с собственным решением OP с использованием счетчиков Atomic? Я бы сказал, что, вероятно, не IFF, он настаивает на использовании Executors, но я считаю, что такой же приемлемый альтернативный подход с использованием AQS в этом случае, а также применим к генерическим потокам.
Откажитесь от других хакеров.
Ответ 8
Если вы хотите использовать классы JSR166y - например, Phaser или Fork/Join - любой из них может работать для вас, вы всегда можете загрузить их backback из них: http://gee.cs.oswego.edu/dl/concurrency-interest/ и использовать что в качестве основы, а не для написания полностью доморощенного решения. Затем, когда выйдет 7, вы можете просто сбросить зависимость от backport и изменить несколько имен пакетов.
(Полное раскрытие: мы использовали LinkedTransferQueue в prod на некоторое время сейчас. Нет проблем)
Ответ 9
Я должен сказать, что решения, описанные выше проблемы с рекурсивной вызывающей задачей и ожидания задач конечного подотряда, не удовлетворяют меня. Есть мое решение, вдохновленное оригинальной документацией от Oracle: CountDownLatch и пример там: Человеческие ресурсы CountDownLatch.
Первый общий поток в процессе, например класс HRManagerCompact, имеет задержку ожидания для двух дочерних потоков, у которых есть ожидающие защелки для последующих двух дочерних потоков... и т.д.
Конечно, защелка может быть установлена на другое значение, чем 2 (в конструкторе CountDownLatch), а также количество запущенных объектов может быть установлено в итерации, то есть в ArrayList, но оно должно соответствовать (количество обратных отсчетов должно быть равный параметру в конструкторе CountDownLatch).
Будьте осторожны, количество защелок увеличивается экспоненциально в соответствии с условием ограничения:
'level.get() < 2 ', а также количество объектов. 1, 2, 4, 8, 16... и защелки 0, 1, 2, 4... Как вы можете видеть, для четырех уровней (level.get() < 4) будет 15 ожидающих потоков и 7 защелки во времени, когда работают пиковые пики 16.
package processes.countdownlatch.hr;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/** Recursively latching running classes to wait for the peak threads
*
* @author hariprasad
*/
public class HRManagerCompact extends Thread {
final int N = 2; // number of daughter tasks for latch
CountDownLatch countDownLatch;
CountDownLatch originCountDownLatch;
AtomicInteger level = new AtomicInteger(0);
AtomicLong order = new AtomicLong(0); // id latched thread waiting for
HRManagerCompact techLead1 = null;
HRManagerCompact techLead2 = null;
HRManagerCompact techLead3 = null;
// constructor
public HRManagerCompact(CountDownLatch countDownLatch, String name,
AtomicInteger level, AtomicLong order){
super(name);
this.originCountDownLatch=countDownLatch;
this.level = level;
this.order = order;
}
private void doIt() {
countDownLatch = new CountDownLatch(N);
AtomicInteger leveli = new AtomicInteger(level.get() + 1);
AtomicLong orderi = new AtomicLong(Thread.currentThread().getId());
techLead1 = new HRManagerCompact(countDownLatch, "first", leveli, orderi);
techLead2 = new HRManagerCompact(countDownLatch, "second", leveli, orderi);
//techLead3 = new HRManagerCompact(countDownLatch, "third", leveli);
techLead1.start();
techLead2.start();
//techLead3.start();
try {
synchronized (Thread.currentThread()) { // to prevent print and latch in the same thread
System.out.println("*** HR Manager waiting for recruitment to complete... " + level + ", " + order + ", " + orderi);
countDownLatch.await(); // wait actual thread
}
System.out.println("*** Distribute Offer Letter, it means finished. " + level + ", " + order + ", " + orderi);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ": working... " + level + ", " + order + ", " + Thread.currentThread().getId());
Thread.sleep(10*level.intValue());
if (level.get() < 2) doIt();
Thread.yield();
}
catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
/*catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}*/
// TODO Auto-generated method stub
System.out.println("--- " +Thread.currentThread().getName() + ": recruted " + level + ", " + order + ", " + Thread.currentThread().getId());
originCountDownLatch.countDown(); // count down
}
public static void main(String args[]){
AtomicInteger levelzero = new AtomicInteger(0);
HRManagerCompact hr = new HRManagerCompact(null, "zero", levelzero, new AtomicLong(levelzero.longValue()));
hr.doIt();
}
}
Возможный комментарий (с некоторой вероятностью):
first: working... 1, 1, 10 // thread 1, first daughter task (10)
second: working... 1, 1, 11 // thread 1, second daughter task (11)
first: working... 2, 10, 12 // thread 10, first daughter task (12)
first: working... 2, 11, 14 // thread 11, first daughter task (14)
second: working... 2, 11, 15 // thread 11, second daughter task (15)
second: working... 2, 10, 13 // thread 10, second daughter task (13)
--- first: recruted 2, 10, 12 // finished 12
--- first: recruted 2, 11, 14 // finished 14
--- second: recruted 2, 10, 13 // finished 13 (now can be opened latch 10)
--- second: recruted 2, 11, 15 // finished 15 (now can be opened latch 11)
*** HR Manager waiting for recruitment to complete... 0, 0, 1
*** HR Manager waiting for recruitment to complete... 1, 1, 10
*** Distribute Offer Letter, it means finished. 1, 1, 10 // latch on 10 opened
--- first: recruted 1, 1, 10 // finished 10
*** HR Manager waiting for recruitment to complete... 1, 1, 11
*** Distribute Offer Letter, it means finished. 1, 1, 11 // latch on 11 opened
--- second: recruted 1, 1, 11 // finished 11 (now can be opened latch 1)
*** Distribute Offer Letter, it means finished. 0, 0, 1 // latch on 1 opened
Ответ 10
Единственное нечеткое решение, которое я мог бы придумать, - это напрямую использовать ThreadPoolExecutor и запрашивать его getPoolSize() каждый раз. Неужели нет лучшего способа сделать это?
Вы должны использовать методы shutdown() ,
awaitTermination() and shutdownNow()
в правильной последовательности.
shutdown()
: инициирует упорядоченное завершение работы, в котором выполняются ранее поставленные задачи, но новые задачи не будут приняты.
awaitTermination()
: Блокирует, пока все задачи не завершили выполнение после запроса на завершение работы или не произойдет тайм-аут, или текущий поток не будет прерван, в зависимости от того, что произойдет раньше.
shutdownNow()
: пытается остановить все активное выполнение задач, останавливает обработку ожидающих задач и возвращает список задач, ожидающих выполнения.
Рекомендуемый способ из страницы документации оракула ExecutorService:
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
Вы можете заменить условие с условием while в случае продолжительности выполнения задач, как показано ниже:
Измените
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
Для
while(!pool.awaitTermination(60, TimeUnit.SECONDS)) {
Thread.sleep(60000);
}
Вы можете ссылаться на другие альтернативы (кроме join()
, которые могут использоваться с автономным потоком):
дождитесь, пока все потоки закончат работу в java
Ответ 11
Вы можете использовать бегун, который отслеживает запущенные потоки:
Runner runner = Runner.runner(numberOfThreads);
runner.runIn(2, SECONDS, callable);
runner.run(callable);
// blocks until all tasks are finished (or failed)
runner.waitTillDone();
// and reuse it
runner.runRunnableIn(500, MILLISECONDS, runnable);
runner.waitTillDone();
// and then just kill it
runner.shutdownAndAwaitTermination();
чтобы использовать его, просто добавьте зависимость:
скомпилируйте 'com.github.matejtymes: javafixes: 1.3.0'