Java Thread Pools/Executor Service и wait() s - что происходит с цепочкой потоков и задач?

Я огляделся, но не нашел ответа, поэтому я хотел подтвердить это наверняка.

Скажем, у меня есть пул потоков фиксированного размера - ExecutorService pool = Executors.newFixedThreadPool(5);

И у меня есть код:

pool.execute(new Runnable(){
    try{
        Object waitForMe = doSomethingAndGetObjectToWaitFor();
        waitForMe.wait();
        doSomethingElse();
    }catch(Exception e){ throw new RunTimeException(e) } 

});

Предположим, что вышеупомянутый код называется несколько раз. В пуле всего 5 потоков (так что только 5 из приведенных выше заявлений должны быть в режиме реального времени в одной точке). Также предположим, что wait() находится на объекте, выполняющем некоторые вызовы ввода-вывода на стороне thrid и ожидая обратного вызова, когда операция завершена, поэтому, естественно, потребуется некоторое время для завершения.

Теперь мой вопрос в том, что такое поведение, когда одна из этих задач достигает wait(), выполняет ли задача спать, а затем поток из пула потоков отводит очередную задачу и запускает ее?

Если ожидаемая задача ложится спать, что происходит, когда она получает notify() и просыпается? Возвращает ли поток в очередь (спереди или сзади) для пула потоков и ждет, пока один из 5 потоков не сможет продолжить его выполнение (т.е. Вызов doSomethingelse())? Или поток, который его выполнял, также переходит в спящий режим, т.е. Один из 5 потоков исполнителей сидит в ожидании с задачей (это то, что я предполагаю)? Или поток исполнителей поднимает другую задачу и просто прерывается, когда первая задача возвращается из wait()?

Ответы

Ответ 1

wait() - операция блокировки:

Заставляет текущий поток ждать, пока другой поток вызовет метод notify() или notifyAll()

Это означает, что поток в пуле будет ждать, но извне он просто выглядит так, что текущая задача занимает столько времени, чтобы завершить. Это также означает, что если выполняются 5 задач и все они wait(), Executor не может обрабатывать оставшиеся задачи, которые, ekhem, ждут в очереди.

Правда, сам исполнительский поток переходит в спящий режим, позволяя другим потокам переключать и потреблять CPU (так что вы можете одновременно поддерживать сотни потоков, и ваша система все еще реагирует), но тем не менее поток "неприменим" и заблокирован.

Еще одна интересная функция - прерывание - если поток ждет чего-то или спит, вы можете прервать его. Обратите внимание, что оба wait() и Thread.sleep() объявляют InterruptedException. С помощью ExecutorService вы можете воспользоваться этим, просто позвонив: future.cancel() (future - это объект, который вы получили в ответ при отправке задачи на ExecutorService).

Наконец, я думаю, вы должны пересмотреть свое решение. Вместо того, чтобы активно ждать завершения внешней системы, предоставить API обратные вызовы:

pool.execute(new Runnable(){
    try{
        doSomethingAndCallMeBackWhenItsDone(new Callback() {
            public void done() {
                doSomethingElse();
            }
        });
    }catch(Exception e){ throw new RunTimeException(e) } 

});

Таким образом API внешней системы будет просто уведомлять вас, когда результаты будут готовы, и вам не придется ждать и блокировать ExecutorService. Наконец, если doSomethingElse() занимает много времени, вы даже можете запланировать его, а не использовать внешний сторонний поток ввода-вывода:

pool.execute(new Runnable(){
    try{
        doSomethingAndCallMeBackWhenItIsDone(new Callback() {
            public void done() {
                pool.submit(new Callbale<Void>() {
                    public Void call() {
                        doSomethingElse();
                    }
                }
            }
        });
    }catch(Exception e){ throw new RunTimeException(e) } 

});

UPDATE: вы спрашиваете, что делать с таймаутами? Вот моя идея:

pool.execute(new Runnable(){
    try{
        doSomethingAndCallMeBackWhenItsDone(new Callback() {
            public void done() {
                doSomethingElse();
            }
            public void timeout() {
                //opps!
            }
        });
    }catch(Exception e){ throw new RunTimeException(e) } 

});

Я предполагаю, что вы можете реализовать тайм-аут на стороне третьей стороны, и если там происходит тайм-аут, просто вызовите метод timeout().

Ответ 2

wait() ничего не знает о пуле протектора. И пул потоков ничего не знает о wait(). Поэтому они никак не могут взаимодействовать.

Они работают как обычно - wait() просто длинная операция блокировки, пул потоков - это всего лишь очередь runnables для работы в ограниченном пуле потоков.

Ответ 3

Я бы прокомментировал ответ Томаша, но моя репутация не позволяет (пока), извините.

Я знаю, что вопрос старый, но для людей, которые все еще читают эту страницу, посмотрите на будущее и особенно на guava ListenableFuture, который позволяет вам регистрировать обратные вызовы и цепочку будущего вместе, точно, цель не блокировать поток (и тем самым освобождать поток обратно в пул для чего-то другого, чтобы использовать его).

Ответ 4

Все 5 потоков будут заблокированы, и приложение будет в непроизводственном состоянии.

Добавляя к Tomasz ответ, я хотел бы реализовать механизм тайм-аута следующим образом.

            Future<Long> futureResult = service.execute(myCallable);
            Long result = null;
            try{
                result = futureResult.get(5000, TimeUnit.MILLISECONDS);
            }catch(TimeoutException e){
                System.out.println("Time out after 5 seconds");
                futureResult.cancel(true);
            }catch(InterruptedException ie){
                System.out.println("Error: Interrupted");
            }catch(ExecutionException ee){
                System.out.println("Error: Execution interrupted");
            }

Помимо TimeoutException, вы можете отменить Future во время InterruptedException & ExecutionException. Если вы используете submit() вместо execute(), InterruptedException & ExecutionException будет проглочен в самой структуре.