Используя Spring @Scheduled и @Async вместе
Вот мой прецедент.
Устаревшая система обновляет таблицу очередей базы данных QUEUE.
Мне нужна запланированная повторяющаяся работа, которая
- проверяет содержимое QUEUE
- если в таблице есть строки, они блокируют строку и выполняют некоторую работу
- удаляет строку в QUEUE
Если предыдущее задание все еще запущено, то для выполнения этой работы будет создан новый поток. Я хочу настроить максимальное количество одновременных потоков.
Я использую Spring 3, и мое текущее решение состоит в том, чтобы сделать следующее (используя fixedRate 1 миллисекунды, чтобы потоки выполнялись в основном непрерывно)
@Scheduled(fixedRate = 1)
@Async
public void doSchedule() throws InterruptedException {
log.debug("Start schedule");
publishWorker.start();
log.debug("End schedule");
}
<task:executor id="workerExecutor" pool-size="4" />
Это создало 4 потока прямо, и потоки правильно распределяли рабочую нагрузку из очереди. Однако я, кажется, получаю утечку памяти, когда потоки занимают много времени.
java.util.concurrent.ThreadPoolExecutor @ 0xe097b8f0 | 80 | 373,410,496 | 89.74%
|- java.util.concurrent.LinkedBlockingQueue @ 0xe097b940 | 48 | 373,410,136 | 89.74%
| |- java.util.concurrent.LinkedBlockingQueue$Node @ 0xe25c9d68
Итак,
1: Должен ли я использовать @Async и @Scheduled вместе?
2: Если нет, то как еще я могу использовать Spring для выполнения моих требований?
3: Как создать новые потоки только тогда, когда другие потоки заняты?
Спасибо всем!
EDIT: Я думаю, что очередь заданий становилась бесконечно длинной... Теперь, используя
<task:executor id="workerExecutor"
pool-size="1-4"
queue-capacity="10" rejection-policy="DISCARD" />
Отчитается с результатами
Ответы
Ответ 1
Вы можете попробовать
- Запустите планировщик с задержкой в одну секунду, которая будет блокировать и извлекать все
Записи QUEUE, которые до сих пор не заблокированы.
- Для каждой записи вызовите метод Async, который обработает эту запись и удалит ее.
- Политика отклонения исполнителя должна быть ABORT, так что планировщик может разблокировать QUEUE, которые еще не выданы для обработки. Таким образом, планировщик может снова обработать эти QUEUE в следующем прогоне.
Конечно, вам придется обрабатывать сценарий, когда планировщик заблокировал QUEUE, но обработчик не закончил обработку по какой-либо причине.
Псевдокод:
public class QueueScheduler {
@AutoWired
private QueueHandler queueHandler;
@Scheduled(fixedDelay = 1000)
public void doSchedule() throws InterruptedException {
log.debug("Start schedule");
List<Long> queueIds = lockAndFetchAllUnlockedQueues();
for (long id : queueIds)
queueHandler.process(id);
log.debug("End schedule");
}
}
public class QueueHandler {
@Async
public void process(long queueId) {
// process the QUEUE & delete it from DB
}
}
<task:executor id="workerExecutor" pool-size="1-4" queue-capcity="10"
rejection-policy="ABORT"/>
Ответ 2
//using a fixedRate of 1 millisecond to get the threads to run basically continuously
@Scheduled(fixedRate = 1)
Когда вы используете @Scheduled
, будет создан новый поток и вызовет метод doSchedule
в указанном fixedRate с 1 миллисекундой. При запуске приложения вы уже можете увидеть 4 потока, конкурирующих за таблицу QUEUE и, возможно, мертвую блокировку.
Исследуйте, есть ли тупик, принимая дамп потока.
http://helpx.adobe.com/cq/kb/TakeThreadDump.html
Аннотации @Async здесь не будут использоваться.
Лучший способ реализовать это - создать класс как поток, выполнив runnable и передав ваш класс в TaskExecutor
с нужным количеством потоков.
Используя Spring threading и TaskExecutor, как узнать, когда поток завершен?
Также проверьте свой дизайн, он, похоже, не правильно обрабатывает синхронизацию. Если предыдущее задание выполняется и удерживает блокировку в строке, следующее задание, которое вы создаете, будет по-прежнему видеть эту строку и будет ожидать получения блокировки в этой конкретной строке.