Java Thread Pools/Executor Service и wait() s - что происходит с цепочкой потоков и задач?
Я огляделся, но не нашел ответа, поэтому я хотел подтвердить это наверняка.
Скажем, у меня есть пул потоков фиксированного размера - ExecutorService pool = Executors.newFixedThreadPool(5);
И у меня есть код:
pool.execute(new Runnable(){
try{
Object waitForMe = doSomethingAndGetObjectToWaitFor();
waitForMe.wait();
doSomethingElse();
}catch(Exception e){ throw new RunTimeException(e) }
});
Предположим, что вышеупомянутый код называется несколько раз. В пуле всего 5 потоков (так что только 5 из приведенных выше заявлений должны быть в режиме реального времени в одной точке). Также предположим, что wait()
находится на объекте, выполняющем некоторые вызовы ввода-вывода на стороне thrid и ожидая обратного вызова, когда операция завершена, поэтому, естественно, потребуется некоторое время для завершения.
Теперь мой вопрос в том, что такое поведение, когда одна из этих задач достигает wait()
, выполняет ли задача спать, а затем поток из пула потоков отводит очередную задачу и запускает ее?
Если ожидаемая задача ложится спать, что происходит, когда она получает notify()
и просыпается? Возвращает ли поток в очередь (спереди или сзади) для пула потоков и ждет, пока один из 5 потоков не сможет продолжить его выполнение (т.е. Вызов doSomethingelse()
)? Или поток, который его выполнял, также переходит в спящий режим, т.е. Один из 5 потоков исполнителей сидит в ожидании с задачей (это то, что я предполагаю)? Или поток исполнителей поднимает другую задачу и просто прерывается, когда первая задача возвращается из wait()?
Ответы
Ответ 1
wait()
- операция блокировки:
Заставляет текущий поток ждать, пока другой поток вызовет метод notify() или notifyAll()
Это означает, что поток в пуле будет ждать, но извне он просто выглядит так, что текущая задача занимает столько времени, чтобы завершить. Это также означает, что если выполняются 5 задач и все они wait()
, Executor
не может обрабатывать оставшиеся задачи, которые, ekhem, ждут в очереди.
Правда, сам исполнительский поток переходит в спящий режим, позволяя другим потокам переключать и потреблять CPU (так что вы можете одновременно поддерживать сотни потоков, и ваша система все еще реагирует), но тем не менее поток "неприменим" и заблокирован.
Еще одна интересная функция - прерывание - если поток ждет чего-то или спит, вы можете прервать его. Обратите внимание, что оба wait()
и Thread.sleep()
объявляют InterruptedException
. С помощью ExecutorService
вы можете воспользоваться этим, просто позвонив: future.cancel()
(future
- это объект, который вы получили в ответ при отправке задачи на ExecutorService
).
Наконец, я думаю, вы должны пересмотреть свое решение. Вместо того, чтобы активно ждать завершения внешней системы, предоставить API обратные вызовы:
pool.execute(new Runnable(){
try{
doSomethingAndCallMeBackWhenItsDone(new Callback() {
public void done() {
doSomethingElse();
}
});
}catch(Exception e){ throw new RunTimeException(e) }
});
Таким образом API внешней системы будет просто уведомлять вас, когда результаты будут готовы, и вам не придется ждать и блокировать ExecutorService
. Наконец, если doSomethingElse()
занимает много времени, вы даже можете запланировать его, а не использовать внешний сторонний поток ввода-вывода:
pool.execute(new Runnable(){
try{
doSomethingAndCallMeBackWhenItIsDone(new Callback() {
public void done() {
pool.submit(new Callbale<Void>() {
public Void call() {
doSomethingElse();
}
}
}
});
}catch(Exception e){ throw new RunTimeException(e) }
});
UPDATE: вы спрашиваете, что делать с таймаутами? Вот моя идея:
pool.execute(new Runnable(){
try{
doSomethingAndCallMeBackWhenItsDone(new Callback() {
public void done() {
doSomethingElse();
}
public void timeout() {
//opps!
}
});
}catch(Exception e){ throw new RunTimeException(e) }
});
Я предполагаю, что вы можете реализовать тайм-аут на стороне третьей стороны, и если там происходит тайм-аут, просто вызовите метод timeout()
.
Ответ 2
wait()
ничего не знает о пуле протектора. И пул потоков ничего не знает о wait()
. Поэтому они никак не могут взаимодействовать.
Они работают как обычно - wait()
просто длинная операция блокировки, пул потоков - это всего лишь очередь runnables для работы в ограниченном пуле потоков.
Ответ 3
Я бы прокомментировал ответ Томаша, но моя репутация не позволяет (пока), извините.
Я знаю, что вопрос старый, но для людей, которые все еще читают эту страницу, посмотрите на будущее и особенно на guava ListenableFuture, который позволяет вам регистрировать обратные вызовы и цепочку будущего вместе, точно, цель не блокировать поток (и тем самым освобождать поток обратно в пул для чего-то другого, чтобы использовать его).
Ответ 4
Все 5 потоков будут заблокированы, и приложение будет в непроизводственном состоянии.
Добавляя к Tomasz
ответ, я хотел бы реализовать механизм тайм-аута следующим образом.
Future<Long> futureResult = service.execute(myCallable);
Long result = null;
try{
result = futureResult.get(5000, TimeUnit.MILLISECONDS);
}catch(TimeoutException e){
System.out.println("Time out after 5 seconds");
futureResult.cancel(true);
}catch(InterruptedException ie){
System.out.println("Error: Interrupted");
}catch(ExecutionException ee){
System.out.println("Error: Execution interrupted");
}
Помимо TimeoutException
, вы можете отменить Future во время InterruptedException & ExecutionException
. Если вы используете submit() вместо execute(), InterruptedException & ExecutionException
будет проглочен в самой структуре.