С помощью Java ExecutorService, как я могу активно выполнять задачи, но останавливать обработку ожидающих задач?

Я использую ExecutorService (ThreadPoolExecutor) для запуска (и очереди) множества задач. Я пытаюсь написать некоторый код завершения, который является настолько изящным, насколько это возможно.

ExecutorService имеет два способа отключения:

  • Я могу вызвать ExecutorService.shutdown(), а затем ExecutorService.awaitTermination(...).
  • Я могу позвонить ExecutorService.shutdownNow().

В соответствии с JavaDoc команда shutdown:

Initiates an orderly shutdown in which previously submitted
tasks are executed, but no new tasks will be accepted.

И команда shutdownNow:

Attempts to stop all actively executing tasks, halts the
processing of waiting tasks, and returns a list of the tasks that were
awaiting execution.

Мне нужно что-то между этими двумя параметрами.

Я хочу вызвать команду, которая:
  а. Завершает текущую активную задачу или задачи (например, shutdown).
  б. Завершает обработку ожидающих задач (например, shutdownNow).

Например: предположим, что у меня есть ThreadPoolExecutor с 3 потоками. В настоящее время у него 50 задач в очереди, при этом первые 3 активно работают. Я хочу, чтобы эти 3 активные задачи были завершены, но я не хочу, чтобы остальные 47 задач запускались.

Я считаю, что я могу завершить работу ExecutorService таким образом, сохранив список объектов Future, а затем вызывая cancel для всех них. Но поскольку задачи передаются в этот ExecutorService из нескольких потоков, не было бы чистого способа сделать это.

Я действительно надеюсь, что мне не хватает чего-то очевидного или что есть способ сделать это чисто.

Спасибо за любую помощь.

Ответы

Ответ 1

Недавно я столкнулся с этой проблемой. Может быть более элегантный подход, но мое решение состоит в том, чтобы сначала вызвать shutdown(), затем вытащить BlockingQueue, который используется ThreadPoolExecutor, и вызвать clear() на нем (или же слить его в другой Collection для хранения). Наконец, вызов awaitTermination() позволяет пулу потоков завершить то, что в данный момент находится на его пластине.

Например:

public static void shutdownPool(boolean awaitTermination) throws InterruptedException {

    //call shutdown to prevent new tasks from being submitted
    executor.shutdown();

    //get a reference to the Queue
    final BlockingQueue<Runnable> blockingQueue = executor.getQueue();

    //clear the Queue
    blockingQueue.clear();
    //or else copy its contents here with a while loop and remove()

    //wait for active tasks to be completed
    if (awaitTermination) {
        executor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }
}

Этот метод будет реализован в классе directating, который обертывает ThreadPoolExecutor ссылкой executor.

Важно отметить следующее из ThreadPoolExecutor.getQueue() javadoc:

Доступ к очереди задач предназначен прежде всего для отладки и мониторинг. Эта очередь может активно использоваться. Получение очереди задач не препятствует выполнению задач, стоящих в очереди.

Это подчеркивает тот факт, что дополнительные задачи могут быть опрошены из BlockingQueue, в то время как вы его истощаете. Тем не менее, все реализации BlockingQueue являются потокобезопасными в соответствии с этой документацией интерфейса, поэтому это не должно вызывать проблем.

Ответ 2

Вы можете обернуть каждую отправленную задачу немного дополнительной логикой

wrapper = new Runnable()
    public void run()
        if(executorService.isShutdown())
            throw new Error("shutdown");
        task.run();

executorService.submit(wrapper);

накладные расходы дополнительной проверки незначительны. После того, как исполнитель выключится, обертки все равно будут выполнены, но исходные задачи не будут.

Ответ 3

shutdownNow() - это именно то, что вам нужно. Вы пропустили первое слово "Попытки" и весь второй абзац javadoc:

Нет никаких гарантий, кроме лучших попыток прекратить обработку, активно выполняющую задачи. Например, типичные реализации будут отменены с помощью Thread.interrupt(), поэтому любая задача, которая не отвечает на прерывания, может никогда не прекратиться.

Таким образом, только задачи, которые проверяют Thread#isInterrupted() на регулярной основе (например, в цикле while (!Thread.currentThread().isInterrupted()) или что-то еще), будут прекращается. Но если вы не проверяете это в своей задаче, он все равно будет продолжать работать.