Динамическое изменение размера java.util.concurrent.ThreadPoolExecutor при выполнении задач ожидания

Я работаю с java.util.concurrent.ThreadPoolExecutor для обработки нескольких элементов параллельно. Хотя сама потоковая обработка работает нормально, иногда мы сталкиваемся с другими ограничениями ресурсов из-за действий, происходящих в потоках, что заставляло нас набирать количество потоков в пуле.

Я хотел бы знать, есть ли способ набрать число потоков, пока потоки фактически работают. Я знаю, что вы можете вызывать setMaximumPoolSize() и/или setCorePoolSize(), но это только изменяет размер пула после того, как потоки становятся бездействующими, но они не простаивают, пока в очереди не будет задач.

Ответы

Ответ 1

Насколько я могу судить, это невозможно в чистом виде.

Вы можете реализовать метод beforeExecute, чтобы проверить некоторое логическое значение и заставить потоки временно останавливаться. Имейте в виду, что они будут содержать задачу, которая не будет выполняться до тех пор, пока они не будут повторно включены.

Кроме того, вы можете реализовать afterExecute, чтобы вызывать исключение RuntimeException, когда вы насыщены. Это эффективно приведет к тому, что Thread умрет, и поскольку Исполнитель будет превышать максимальный, никакой новый не будет создан.

Я тоже не рекомендую. Вместо этого попробуйте найти другой способ управления одновременным выполнением задач, которые вызывают проблему. Возможно, выполнив их в отдельном пуле потоков с более ограниченным числом рабочих.

Ответ 2

Вы абсолютно можете. Вызов setCorePoolSize(int) изменит размер ядра пула. Вызовы этого метода являются потокобезопасными и переопределяющими настройками, предоставляемыми конструктору ThreadPoolExecutor. Если вы обрезаете размер пула, оставшиеся потоки будут отключены после завершения их текущей очереди заданий (если они неактивны, они будут немедленно отключены). Если вы увеличиваете размер пула, новые потоки будут распределены как можно скорее. Временные рамки для распределения новых потоков недокументированы - но в реализации распределение новых потоков выполняется при каждом вызове метода execute.

Чтобы связать это с рабочей фермой заданий, зависящей от времени выполнения, вы можете открыть это свойство (либо оболочкой, либо используя динамический экспортер MBean) в качестве атрибута JMX для чтения и записи, чтобы создать довольно приятный, "на лету" настраиваемый пакетный процессор.

Чтобы уменьшить размер пула принудительно во время выполнения (который является вашим запросом), вы должны подклассифицировать ThreadPoolExecutor и добавить нарушение метода beforeExecute(Thread,Runnable). Прерывание потока не является достаточным нарушением, поскольку оно взаимодействует только с состояниями ожидания, а во время обработки потоки задач ThreadPoolExecutor не переходят в состояние прерывания.

Недавно у меня была такая же проблема, пытаясь заставить пул потоков принудительно завершить работу перед выполнением всех поставленных задач. Чтобы это произошло, я прервал поток, выбросив исключение во время выполнения только после замены UncaughtExceptionHandler потока на тот, который ожидает мое конкретное исключение и отбрасывает его.

/**
 * A runtime exception used to prematurely terminate threads in this pool.
 */
static class ShutdownException
extends RuntimeException {
    ShutdownException (String message) {
        super(message);
    }
}

/**
 * This uncaught exception handler is used only as threads are entered into
 * their shutdown state.
 */
static class ShutdownHandler 
implements UncaughtExceptionHandler {
    private UncaughtExceptionHandler handler;

    /**
     * Create a new shutdown handler.
     *
     * @param handler The original handler to deligate non-shutdown
     * exceptions to.
     */
    ShutdownHandler (UncaughtExceptionHandler handler) {
        this.handler = handler;
    }
    /**
     * Quietly ignore {@link ShutdownException}.
     * <p>
     * Do nothing if this is a ShutdownException, this is just to prevent
     * logging an uncaught exception which is expected.  Otherwise forward
     * it to the thread group handler (which may hand it off to the default
     * uncaught exception handler).
     * </p>
     */
    public void uncaughtException (Thread thread, Throwable throwable) {
        if (!(throwable instanceof ShutdownException)) {
            /* Use the original exception handler if one is available,
             * otherwise use the group exception handler.
             */
            if (handler != null) {
                handler.uncaughtException(thread, throwable);
            }
        }
    }
}
/**
 * Configure the given job as a spring bean.
 *
 * <p>Given a runnable task, configure it as a prototype spring bean,
 * injecting any necessary dependencices.</p>
 *
 * @param thread The thread the task will be executed in.
 * @param job The job to configure.
 *
 * @throws IllegalStateException if any error occurs.
 */
protected void beforeExecute (final Thread thread, final Runnable job) {
    /* If we're in shutdown, it because spring is in singleton shutdown
     * mode.  This means we must not attempt to configure the bean, but
     * rather we must exit immediately (prematurely, even).
     */
    if (!this.isShutdown()) {
        if (factory == null) {
            throw new IllegalStateException(
                "This class must be instantiated by spring"
                );
        }

        factory.configureBean(job, job.getClass().getName());
    }
    else {
        /* If we are in shutdown mode, replace the job on the queue so the
         * next process will see it and it won't get dropped.  Further,
         * interrupt this thread so it will no longer process jobs.  This
         * deviates from the existing behavior of shutdown().
         */
        workQueue.add(job);

        thread.setUncaughtExceptionHandler(
            new ShutdownHandler(thread.getUncaughtExceptionHandler())
            );

        /* Throwing a runtime exception is the only way to prematurely
         * cause a worker thread from the TheadPoolExecutor to exit.
         */
        throw new ShutdownException("Terminating thread");
    }
}

В вашем случае вы можете захотеть создать семафор (только для использования в качестве счетчика потокобезопасности), который не имеет разрешений, а при выключении потоков выдает ему несколько разрешений, соответствующих дельтам предыдущего пула ядра размер и новый размер пула (требуется переопределить метод setCorePoolSize(int)). Это позволит вам завершить ваши потоки после завершения их текущей задачи.

private Semaphore terminations = new Semaphore(0);

protected void beforeExecute (final Thread thread, final Runnable job) {
    if (terminations.tryAcquire()) {
        /* Replace this item in the queue so it may be executed by another
         * thread
         */
        queue.add(job);

        thread.setUncaughtExceptionHandler(
            new ShutdownHandler(thread.getUncaughtExceptionHandler())
            );

        /* Throwing a runtime exception is the only way to prematurely
         * cause a worker thread from the TheadPoolExecutor to exit.
         */
        throw new ShutdownException("Terminating thread");
    }
}

public void setCorePoolSize (final int size) {
    int delta = getActiveCount() - size;

    super.setCorePoolSize(size);

    if (delta > 0) {
        terminations.release(delta);
    }
}

Это должно прерывать n потоков для запроса f (n) = active. Если есть какая-либо проблема, стратегия распределения ThreadPoolExecutor довольно долговечна. Он хранит досрочное завершение с использованием блока finally, который гарантирует выполнение. По этой причине, даже если вы завершаете слишком много потоков, они будут повторно заполнены.

Ответ 3

Решение состоит в том, чтобы слить очередь ThreadPoolExecutor, установить размер ThreadPoolExecutor по мере необходимости, а затем добавить потоки один за другим, как только остальные завершатся. Метод для извлечения очереди в классе ThreadPoolExecutor является закрытым, поэтому вы должны создать его самостоятельно. Вот код:

/**
 * Drains the task queue into a new list. Used by shutdownNow.
 * Call only while holding main lock.
 */
public static List<Runnable> drainQueue() {
    List<Runnable> taskList = new ArrayList<Runnable>();
    BlockingQueue<Runnable> workQueue = executor.getQueue();
    workQueue.drainTo(taskList);
    /*
     * If the queue is a DelayQueue or any other kind of queue
     * for which poll or drainTo may fail to remove some elements,
     * we need to manually traverse and remove remaining tasks.
     * To guarantee atomicity wrt other threads using this queue,
     * we need to create a new iterator for each element removed.
     */
    while (!workQueue.isEmpty()) {
        Iterator<Runnable> it = workQueue.iterator();
        try {
            if (it.hasNext()) {
                Runnable r = it.next();
                if (workQueue.remove(r))
                    taskList.add(r);
            }
        } catch (ConcurrentModificationException ignore) {
        }
    }
    return taskList;
}

Перед вызовом этого метода вам нужно получить и затем отпустить основную блокировку. Для этого вам нужно использовать java-отражение, потому что поле "mainLock" является закрытым. Опять же, вот код:

private Field getMainLock() throws NoSuchFieldException {
    Field mainLock = executor.getClass().getDeclaredField("mainLock");
    mainLock.setAccessible(true);
    return mainLock;
}

Где "исполнитель" - ваш ThreadPoolExecutor.

Теперь вам нужны методы блокировки/разблокировки:

public void lock() {
    try {
        Field mainLock = getMainLock();
        Method lock = mainLock.getType().getDeclaredMethod("lock", (Class[])null);
        lock.invoke(mainLock.get(executor), (Object[])null);
    } catch {
        ...
    } 
}

public void unlock() {
    try {
        Field mainLock = getMainLock();
        mainLock.setAccessible(true);
        Method lock = mainLock.getType().getDeclaredMethod("unlock", (Class[])null);
        lock.invoke(mainLock.get(executor), (Object[])null);
    } catch {
        ...
    }  
}

Наконец, вы можете написать свой метод setThreadsNumber, и он будет работать как с увеличением, так и с уменьшением размера ThreadPoolExecutor:

public void setThreadsNumber(int intValue) {
    boolean increasing = intValue > executor.getPoolSize();
    executor.setCorePoolSize(intValue);
    executor.setMaximumPoolSize(intValue);
    if(increasing){
        if(drainedQueue != null && (drainedQueue.size() > 0)){
            executor.submit(drainedQueue.remove(0));
        }
    } else {
        if(drainedQueue == null){
            lock();
            drainedQueue = drainQueue();
            unlock();
        }
    }
}

Примечание: очевидно, что если вы выполняете N параллельных потоков, и вы меняете это число на N-1, все N потоков будут продолжать работать. Когда первый поток заканчивается, новые потоки будут выполняться. Отныне количество параллельных потоков будет выбрано вами.

Ответ 4

Я прочитал документацию setMaximumPoolSize() и setCorePoolSize(), и кажется, что они могут привести к поведению, которое вам нужно.

- EDIT -

Мой вывод был неправильным: подробнее см. обсуждение ниже...

Ответ 5

Я тоже нуждался в том же решении, и кажется, что в JDK8 setCorePoolSize() и setMaximumPoolSize() действительно дают желаемый результат. Я сделал тестовый пример, когда я отправляю 4 задания в пул, и они выполняются последовательно, я уменьшаю размер пула во время их запуска и представляю еще одну текущую версию, которую я хочу быть одинокой. Затем я восстанавливаю исходный размер пула. Вот тестовый источник https://gist.github.com/southerton81/96e141b8feede3fe0b8f88f679bef381

Он производит следующий вывод (поток "50" - это тот, который должен выполняться изолированно)

run:
test thread 2 enter
test thread 1 enter
test thread 3 enter
test thread 4 enter
test thread 1 exit
test thread 2 exit
test thread 3 exit
test thread 4 exit
test thread 50 enter
test thread 50 exit
test thread 1 enter
test thread 2 enter
test thread 3 enter
test thread 4 enter
test thread 1 exit
test thread 2 exit
test thread 3 exit
test thread 4 exit