Ожидание завершения всех потоков в Spring Интеграция
У меня есть самозаписывающаяся программа jar, которая в значительной степени зависит от интеграции Spring. Проблема, с которой я столкнулась, заключается в том, что программа завершается до того, как закончится .
Ниже приведена сокращенная версия кода, который я использую, я могу предоставить больше кода/конфигурации, если это необходимо. Точкой входа является метод main(), который загружает Spring и запускает процесс импорта:
public static void main(String[] args) {
ctx = new ClassPathXmlApplicationContext("flow.xml");
DataImporter importer = (DataImporter)ctx.getBean("MyImporterBean");
try {
importer.startImport();
} catch (Exception e) {
e.printStackTrace();
} finally {
ctx.close();
}
}
DataImporter содержит простой цикл, который передает сообщения в шлюз интеграции Spring. Это обеспечивает активный "толчок" подход к потоку, а не общий подход к опросу данных. Вот где моя проблема:
public void startImport() throws Exception {
for (Item item : items) {
gatewayBean.publish(item);
Thread.sleep(200); // Yield period
}
}
Для полноты поток XML выглядит примерно так:
<gateway default-request-channel="inChannel" service-interface="GatewayBean" />
<splitter input-channel="inChannel" output-channel="splitChannel" />
<payload-type-router input-channel="splitChannel">
<mapping type="Item" channel="itemChannel" />
<mapping type="SomeOtherItem" channel="anotherChannel" />
</payload-type-router>
<outbound-channel-adapter channel="itemChannel" ref="DAOBean" method="persist" />
Поток запускается и обрабатывает элементы эффективно, но как только цикл startImport() заканчивается, основной поток завершается и сразу же сбрасывает все потоки интеграции Spring. Это приводит к условию гонки, последние (n) элементы не полностью обрабатываются, когда программа завершается.
У меня есть идея поддержания ссылочного счета элементов, которые я обрабатываю, но это оказывается довольно сложным, поскольку поток часто расщепляет/маршрутизирует сообщения нескольким активаторам службы - это означает, что трудно определить, элемент "завершен".
Мне кажется, что мне нужен какой-то способ либо проверить, что no Spring beans все еще выполняется, либо отметить, что все элементы, отправленные на шлюз, были полностью обработаны до завершения.
Мой вопрос: как я могу это сделать, или есть лучший подход к моей проблеме, о которой я не думал?
Ответы
Ответ 1
Здесь вы не используете шаблон запроса-ответа.
исходящий канал-адаптер - это действие "огонь" и "забой", если вы хотите дождаться ответа, вы должны использовать исходящий шлюз, который будет ждать ответа, и подключить ответ к исходному шлюзу, а затем в java sendAndReceive not просто опубликуйте.
Ответ 2
Если вы можете определить Item
, нужно ли это или нет (processingFinished() или что-то подобное, выполняемое на промежуточных этапах), вы можете зарегистрировать все Item
в центральном органе, который отслеживает количество незавершенных Item
и effecitvely определяет условие завершения.
Если этот подход возможен, вы даже можете подумать о том, чтобы упаковать элементы в FutureTask
-объекты или использовать похожие понятия из java.util.concurrent
.
Изменить: Вторая идея:
Думали ли вы о том, чтобы сделать каналы более умными? Отправитель закрывает канал, если он не отправляет больше данных. В этом случае рабочий- beans не должен быть потоками деамонов, но может определять свой критерий завершения, основанный на закрытом и пустом канале ввода.