Как можно использовать threadpool после завершения работы
У меня есть .csv файл, содержащий более 70 миллионов строк, из которых каждая строка должна генерировать Runnable, а затем выполняется threadpool. Этот Runnable будет вставлять запись в Mysql.
Что еще, я хочу записать позицию файла csv для RandomAccessFile, чтобы найти. Позиция написана на Файл. Я хочу записать эту запись, когда все потоки в threadpool закончены. Вызывается ThreadPoolExecutor.shutdown(). Но когда приходит больше строк, мне снова нужен поток. Как я могу повторно использовать этот текущий поток, вместо того, чтобы создавать новый.
Код выглядит следующим образом:
public static boolean processPage() throws Exception {
long pos = getPosition();
long start = System.currentTimeMillis();
raf.seek(pos);
if(pos==0)
raf.readLine();
for (int i = 0; i < PAGESIZE; i++) {
String lineStr = raf.readLine();
if (lineStr == null)
return false;
String[] line = lineStr.split(",");
final ExperienceLogDO log = CsvExperienceLog.generateLog(line);
//System.out.println("userId: "+log.getUserId()%512);
pool.execute(new Runnable(){
public void run(){
try {
experienceService.insertExperienceLog(log);
} catch (BaseException e) {
e.printStackTrace();
}
}
});
long end = System.currentTimeMillis();
}
BufferedWriter resultWriter = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(new File(
RESULT_FILENAME), true)));
resultWriter.write("\n");
resultWriter.write(String.valueOf(raf.getFilePointer()));
resultWriter.close();
long time = System.currentTimeMillis()-start;
System.out.println(time);
return true;
}
Спасибо!
Ответы
Ответ 1
Как указано в документации , вы не можете повторно использовать ExecutorService
, который был отключен. Я бы рекомендовал против любых обходных решений, поскольку (а) они могут работать не так, как ожидалось, во всех ситуациях; и (б) вы можете достичь того, что хотите, используя стандартные классы.
Вы должны либо
Первое решение легко реализуется, поэтому я не буду подробно его описывать.
Для второго, так как вы хотите выполнить действие, как только все поставленные задачи будут завершены, вы можете взглянуть на ExecutorCompletionService
и использовать его вместо этого. Он обертывает ExecutorService
, который будет управлять потоком, но runnables будут обернуты во что-то, что сообщит ExecutorCompletionService
, когда они закончат, чтобы он мог сообщить вам:
ExecutorService executor = ...;
ExecutorCompletionService ecs = new ExecutorCompletionService(executor);
for (int i = 0; i < totalTasks; i++) {
... ecs.submit(...); ...
}
for (int i = 0; i < totalTasks; i++) {
ecs.take();
}
Метод take()
в классе ExecutorCompletionService
будет блокироваться до завершения задачи (как обычно, так и внезапно). Он вернет a Future
, поэтому вы можете проверить результаты, если хотите.
Надеюсь, это может вам помочь, так как я не совсем понял вашу проблему.
Ответ 2
создать и сгруппировать все задачи и отправить их в пул с invokeAll (который возвращается только после успешного завершения всех задач)
Ответ 3
После вызова shutdown в ExecutorService новая задача не будет принята. Это необходимо для создания нового ExecutorService для каждого раунда задач.
Однако, с Java 8 ForkJoinPool.awaitQuiescence. Если вы можете переключиться с обычного ExecutorService на ForkJoinPool, вы можете использовать этот метод, чтобы ждать, пока в ForkJoinPool не будет запущено больше задач без необходимости выключения. Это означает, что вы можете заполнить ForkJoinPool с помощью задач, ожидая, пока он не станет пустым (quiescent), а затем снова начните заполнять его с помощью Задачи и т.д.