С помощью Java ExecutorService, как я могу активно выполнять задачи, но останавливать обработку ожидающих задач?
Я использую ExecutorService (ThreadPoolExecutor) для запуска (и очереди) множества задач. Я пытаюсь написать некоторый код завершения, который является настолько изящным, насколько это возможно.
ExecutorService имеет два способа отключения:
- Я могу вызвать
ExecutorService.shutdown()
, а затем ExecutorService.awaitTermination(...)
.
- Я могу позвонить
ExecutorService.shutdownNow()
.
В соответствии с JavaDoc команда shutdown
:
Initiates an orderly shutdown in which previously submitted
tasks are executed, but no new tasks will be accepted.
И команда shutdownNow
:
Attempts to stop all actively executing tasks, halts the
processing of waiting tasks, and returns a list of the tasks that were
awaiting execution.
Мне нужно что-то между этими двумя параметрами.
Я хочу вызвать команду, которая:
а. Завершает текущую активную задачу или задачи (например, shutdown
).
б. Завершает обработку ожидающих задач (например, shutdownNow
).
Например: предположим, что у меня есть ThreadPoolExecutor с 3 потоками. В настоящее время у него 50 задач в очереди, при этом первые 3 активно работают. Я хочу, чтобы эти 3 активные задачи были завершены, но я не хочу, чтобы остальные 47 задач запускались.
Я считаю, что я могу завершить работу ExecutorService таким образом, сохранив список объектов Future
, а затем вызывая cancel
для всех них. Но поскольку задачи передаются в этот ExecutorService из нескольких потоков, не было бы чистого способа сделать это.
Я действительно надеюсь, что мне не хватает чего-то очевидного или что есть способ сделать это чисто.
Спасибо за любую помощь.
Ответы
Ответ 1
Недавно я столкнулся с этой проблемой. Может быть более элегантный подход, но мое решение состоит в том, чтобы сначала вызвать shutdown()
, затем вытащить BlockingQueue
, который используется ThreadPoolExecutor
, и вызвать clear()
на нем (или же слить его в другой Collection
для хранения). Наконец, вызов awaitTermination()
позволяет пулу потоков завершить то, что в данный момент находится на его пластине.
Например:
public static void shutdownPool(boolean awaitTermination) throws InterruptedException {
//call shutdown to prevent new tasks from being submitted
executor.shutdown();
//get a reference to the Queue
final BlockingQueue<Runnable> blockingQueue = executor.getQueue();
//clear the Queue
blockingQueue.clear();
//or else copy its contents here with a while loop and remove()
//wait for active tasks to be completed
if (awaitTermination) {
executor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}
}
Этот метод будет реализован в классе directating, который обертывает ThreadPoolExecutor
ссылкой executor
.
Важно отметить следующее из ThreadPoolExecutor.getQueue()
javadoc:
Доступ к очереди задач предназначен прежде всего для отладки и мониторинг. Эта очередь может активно использоваться. Получение очереди задач не препятствует выполнению задач, стоящих в очереди.
Это подчеркивает тот факт, что дополнительные задачи могут быть опрошены из BlockingQueue
, в то время как вы его истощаете. Тем не менее, все реализации BlockingQueue
являются потокобезопасными в соответствии с этой документацией интерфейса, поэтому это не должно вызывать проблем.
Ответ 2
Вы можете обернуть каждую отправленную задачу немного дополнительной логикой
wrapper = new Runnable()
public void run()
if(executorService.isShutdown())
throw new Error("shutdown");
task.run();
executorService.submit(wrapper);
накладные расходы дополнительной проверки незначительны. После того, как исполнитель выключится, обертки все равно будут выполнены, но исходные задачи не будут.
Ответ 3
shutdownNow()
- это именно то, что вам нужно. Вы пропустили первое слово "Попытки" и весь второй абзац javadoc:
Нет никаких гарантий, кроме лучших попыток прекратить обработку, активно выполняющую задачи. Например, типичные реализации будут отменены с помощью Thread.interrupt()
, поэтому любая задача, которая не отвечает на прерывания, может никогда не прекратиться.
Таким образом, только задачи, которые проверяют Thread#isInterrupted()
на регулярной основе (например, в цикле while (!Thread.currentThread().isInterrupted())
или что-то еще), будут прекращается. Но если вы не проверяете это в своей задаче, он все равно будет продолжать работать.