Как использовать invokeAll(), чтобы все пулы потоков выполняли свою задачу?
ExecutorService pool=Executors.newFixedThreadPool(7);
List<Future<Hotel>> future=new ArrayList<Future<Hotel>>();
List<Callable<Hotel>> callList = new ArrayList<Callable<Hotel>>();
for(int i=0;i<=diff;i++){
String str="2013-"+(liDates.get(i).get(Calendar.MONTH)+1)+"-"+liDates.get(i).get(Calendar.DATE);
callList.add(new HotelCheapestFare(str));
}
future=pool.invokeAll(callList);
for(int i=0;i<=future.size();i++){
System.out.println("name is:"+future.get(i).get().getName());
}
Теперь я хочу, чтобы пул до invokeAll
выполнял всю задачу перед тем, как перейти в цикл for, но когда я запустил эту программу для цикла, выполняется до этого invokeAll
и выбрасывает это исключение:
java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at
com.mmt.freedom.cheapestfare.TestHotel.main(TestHotel.java:65)
Caused by: java.lang.NullPointerException at
com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheapestFare(HotelCheapestFare.java:166)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelCheapestFare.java:219)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelCheapestFare.java:1)
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run
Ответы
Ответ 1
Способ работы ExecutorService
заключается в том, что при вызове invokeAll
он ожидает завершения всех задач:
Выполняет заданные задачи, возвращая список фьючерсов, содержащих их статус и результаты, когда все завершено. Future.isDone() истинно для каждого элемент возвращаемого списка. Обратите внимание, что завершенная задача может иметь либо нормально, либо путем исключения исключений. Результат этот метод undefined, если данная коллекция модифицирована, а эта операция выполняется. 1 (выделено мной)
Это означает, что все ваши задачи выполнены, но некоторые из них могут вызвать исключение. Это исключение является частью Future
- вызывающего get
, заставляет исключение заново завернуть в ExecutionException
.
От вас stacktrack
java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at
^^^ <-- from get
Вы можете видеть, что это действительно так. Одна из ваших задач потерпела неудачу с NPE. ExecutorService
поймал исключение и расскажет вам об этом, бросив ExecutionException
, когда вы вызываете Future.get
.
Теперь, если вы хотите выполнить задачи по мере их завершения, вам понадобится ExecutorCompletionService
. Это действует как BlockingQueue
, что позволит вам опросить задачи по мере их завершения.
public static void main(String[] args) throws Exception {
final ExecutorService executorService = Executors.newFixedThreadPool(10);
final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
executorService.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; ++i) {
try {
final Future<String> myValue = completionService.take();
//do stuff with the Future
final String result = myValue.get();
System.out.println(result);
} catch (InterruptedException ex) {
return;
} catch (ExecutionException ex) {
System.err.println("TASK FAILED");
}
}
}
});
for (int i = 0; i < 100; ++i) {
completionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
if (Math.random() > 0.5) {
throw new RuntimeException("FAILED");
}
return "SUCCESS";
}
});
}
executorService.shutdown();
}
В этом примере у меня есть одна задача, которая вызывает take
в ExecutorCompletionService
, которая получает Future
по мере их появления, а затем отправляю задачи в ExecutorCompletionService
.
Это позволит вам получить неудачную задачу, как только она потерпит неудачу, а не ждать, пока все задачи не сработают вместе.
Единственное осложнение состоит в том, что сложно сказать потоку опроса, что все задачи выполняются, поскольку все теперь асинхронно. В этом случае я использовал знание о том, что будет отправлено 100 заданий, так что ему нужно будет опросить 100 раз. Более общий способ состоял бы в том, чтобы собрать Future
из метода submit
, а затем зациклиться на них, чтобы увидеть, все ли выполнено.
Ответ 2
Future.get() выдает исключения.
CancellationException
- если вычисление было отменено
ExecutionException
- если вычисление выбрало исключение
InterruptedException
- если текущий поток был прерван во время ожидания
Поймать все эти исключения, когда вы вызываете метод get()
.
Я выполнял симуляцию деления на нулевое исключение для некоторых Callable
задач, но исключение в одном Callable
не влияет на другие задачи Callable
, отправленные на ExecutorService
, если вы ловите выше трех исключений, как показано в примере кода.
Пример фрагмента кода:
import java.util.concurrent.*;
import java.util.*;
public class InvokeAllUsage{
public InvokeAllUsage(){
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(10);
List<MyCallable> futureList = new ArrayList<MyCallable>();
for ( int i=0; i<10; i++){
MyCallable myCallable = new MyCallable((long)i+1);
futureList.add(myCallable);
}
System.out.println("Start");
try{
List<Future<Long>> futures = service.invokeAll(futureList);
for(Future<Long> future : futures){
try{
System.out.println("future.isDone = " + future.isDone());
System.out.println("future: call ="+future.get());
}
catch (CancellationException ce) {
ce.printStackTrace();
} catch (ExecutionException ee) {
ee.printStackTrace();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
}catch(Exception err){
err.printStackTrace();
}
System.out.println("Completed");
service.shutdown();
}
public static void main(String args[]){
InvokeAllUsage demo = new InvokeAllUsage();
}
class MyCallable implements Callable<Long>{
Long id = 0L;
public MyCallable(Long val){
this.id = val;
}
public Long call(){
if ( id % 5 == 0){
id = id / 0;
}
return id;
}
}
}
выход:
creating service
Start
future.isDone = true
future: call =1
future.isDone = true
future: call =2
future.isDone = true
future: call =3
future.isDone = true
future: call =4
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
future.isDone = true
future: call =6
future.isDone = true
future: call =7
future.isDone = true
future: call =8
future.isDone = true
future: call =9
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Completed
Ответ 3
invokeAll - это метод блокировки. Это означает, что JVM не перейдет к следующей строке, пока все потоки не будут завершены. Поэтому я думаю, что что-то не так с вашим будущим результатом потока.
System.out.println("name is:"+future.get(i).get().getName());
из этой строки, я думаю, что некоторые фьючерсы не имеют результата и могут быть нулевыми. Поэтому вы должны проверить свой код, если есть некоторые фьючерсы null. Если это так, получите a, если перед выполнением этой строки.