Обработка исключений для ThreadPoolExecutor
У меня есть следующий фрагмент кода, который в основном сканирует список задач, который должен быть выполнен, и каждая задача затем предоставляется исполнителю для выполнения.
JobExecutor
в свою очередь создает другого исполнителя (для выполнения db stuff... чтения и записи данных в очередь) и завершает задачу.
JobExecutor
возвращает Future<Boolean>
для поставленных задач. Когда одна из задач выходит из строя, я хочу изящно прервать все потоки и завершить работу исполнителя, перехватив все исключения. Какие изменения мне нужно сделать?
public class DataMovingClass {
private static final AtomicInteger uniqueId = new AtomicInteger(0);
private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator();
ThreadPoolExecutor threadPoolExecutor = null ;
private List<Source> sources = new ArrayList<Source>();
private static class IDGenerator extends ThreadLocal<Integer> {
@Override
public Integer get() {
return uniqueId.incrementAndGet();
}
}
public void init(){
// load sources list
}
public boolean execute() {
boolean succcess = true ;
threadPoolExecutor = new ThreadPoolExecutor(10,10,
10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("DataMigration-" + uniqueNumber.get());
return t;
}// End method
}, new ThreadPoolExecutor.CallerRunsPolicy());
List<Future<Boolean>> result = new ArrayList<Future<Boolean>>();
for (Source source : sources) {
result.add(threadPoolExecutor.submit(new JobExecutor(source)));
}
for (Future<Boolean> jobDone : result) {
try {
if (!jobDone.get(100000, TimeUnit.SECONDS) && success) {
// in case of successful DbWriterClass, we don't need to change
// it.
success = false;
}
} catch (Exception ex) {
// handle exceptions
}
}
}
public class JobExecutor implements Callable<Boolean> {
private ThreadPoolExecutor threadPoolExecutor ;
Source jobSource ;
public SourceJobExecutor(Source source) {
this.jobSource = source;
threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("Job Executor-" + uniqueNumber.get());
return t;
}// End method
}, new ThreadPoolExecutor.CallerRunsPolicy());
}
public Boolean call() throws Exception {
boolean status = true ;
System.out.println("Starting Job = " + jobSource.getName());
try {
// do the specified task ;
}catch (InterruptedException intrEx) {
logger.warn("InterruptedException", intrEx);
status = false ;
} catch(Exception e) {
logger.fatal("Exception occurred while executing task "+jobSource.getName(),e);
status = false ;
}
System.out.println("Ending Job = " + jobSource.getName());
return status ;
}
}
}
Ответы
Ответ 1
Когда вы отправляете задание исполнителю, он возвращает вам экземпляр FutureTask
.
FutureTask.get()
будет перебрасывать любое исключение, заданное задачей, как ExecutorException
.
Итак, когда вы выполняете итерацию через List<Future>
и вызываете get на каждом, ловите ExecutorException
и вызовите упорядоченное завершение работы.
Ответ 2
Подкласс ThreadPoolExecutor
и переопределить его метод protected afterExecute (Runnable r, Throwable t)
.
Если вы создаете пул потоков через класс удобства java.util.concurrent.Executors
(которого вы не знаете), взгляните на его источник, чтобы увидеть, как он вызывает ThreadPoolExecutor.
Ответ 3
Поскольку вы отправляете задания на ThreadPoolExecutor
, исключения проглатываются FutureTask
.
Посмотрите на это code
**Inside FutureTask$Sync**
void innerRun() {
if (!compareAndSetState(READY, RUNNING))
return;
runner = Thread.currentThread();
if (getState() == RUNNING) { // recheck after setting thread
V result;
try {
result = callable.call();
} catch (Throwable ex) {
setException(ex);
return;
}
set(result);
} else {
releaseShared(0); // cancel
}
}
protected void setException(Throwable t) {
sync.innerSetException(t);
}
Из кода выше видно, что метод setException
ловит Throwable
. По этой причине FutureTask
проглатывает все исключения, если вы используете метод submit()
на ThreadPoolExecutor
В соответствии с java документация вы можете расширить метод afterExecute()
в ThreadPoolExecutor
protected void afterExecute(Runnable r,
Throwable t)
Пример кода в соответствии с документацией:
class ExtendedExecutor extends ThreadPoolExecutor {
// ...
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
System.out.println(t);
}
}
Вы можете поймать Exceptions
тремя способами
-
Future.get()
, как это было предложено в принятом ответе
- завершите весь метод
run()
или call()
в try{}catch{}Exceptoion{}
блоках
- переопределить
afterExecute
метода ThreadPoolExecutor
, как показано выше.
Чтобы изящно прервать другие темы, посмотрите ниже вопрос SE:
Как остановить следующий поток от запуска в ScheduledThreadPoolExecutor
Как принудительно завершить работу java ExecutorService