Ожидание завершения всех потоков в Spring Интеграция

У меня есть самозаписывающаяся программа jar, которая в значительной степени зависит от интеграции Spring. Проблема, с которой я столкнулась, заключается в том, что программа завершается до того, как закончится .

Ниже приведена сокращенная версия кода, который я использую, я могу предоставить больше кода/конфигурации, если это необходимо. Точкой входа является метод main(), который загружает Spring и запускает процесс импорта:

public static void main(String[] args) {
    ctx = new ClassPathXmlApplicationContext("flow.xml");
    DataImporter importer = (DataImporter)ctx.getBean("MyImporterBean");
    try {
        importer.startImport();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        ctx.close();
    }
}

DataImporter содержит простой цикл, который передает сообщения в шлюз интеграции Spring. Это обеспечивает активный "толчок" подход к потоку, а не общий подход к опросу данных. Вот где моя проблема:

public void startImport() throws Exception {
    for (Item item : items) {
        gatewayBean.publish(item);
        Thread.sleep(200); // Yield period
    }
}

Для полноты поток XML выглядит примерно так:

<gateway default-request-channel="inChannel" service-interface="GatewayBean" />

<splitter input-channel="inChannel" output-channel="splitChannel" />

<payload-type-router input-channel="splitChannel">
    <mapping type="Item" channel="itemChannel" />
    <mapping type="SomeOtherItem" channel="anotherChannel" />
</payload-type-router>

<outbound-channel-adapter channel="itemChannel" ref="DAOBean" method="persist" />

Поток запускается и обрабатывает элементы эффективно, но как только цикл startImport() заканчивается, основной поток завершается и сразу же сбрасывает все потоки интеграции Spring. Это приводит к условию гонки, последние (n) элементы не полностью обрабатываются, когда программа завершается.

У меня есть идея поддержания ссылочного счета элементов, которые я обрабатываю, но это оказывается довольно сложным, поскольку поток часто расщепляет/маршрутизирует сообщения нескольким активаторам службы - это означает, что трудно определить, элемент "завершен".

Мне кажется, что мне нужен какой-то способ либо проверить, что no Spring beans все еще выполняется, либо отметить, что все элементы, отправленные на шлюз, были полностью обработаны до завершения.

Мой вопрос: как я могу это сделать, или есть лучший подход к моей проблеме, о которой я не думал?

Ответы

Ответ 1

Здесь вы не используете шаблон запроса-ответа.

исходящий канал-адаптер - это действие "огонь" и "забой", если вы хотите дождаться ответа, вы должны использовать исходящий шлюз, который будет ждать ответа, и подключить ответ к исходному шлюзу, а затем в java sendAndReceive not просто опубликуйте.

Ответ 2

Если вы можете определить Item, нужно ли это или нет (processingFinished() или что-то подобное, выполняемое на промежуточных этапах), вы можете зарегистрировать все Item в центральном органе, который отслеживает количество незавершенных Item и effecitvely определяет условие завершения.

Если этот подход возможен, вы даже можете подумать о том, чтобы упаковать элементы в FutureTask -объекты или использовать похожие понятия из java.util.concurrent.

Изменить: Вторая идея:

Думали ли вы о том, чтобы сделать каналы более умными? Отправитель закрывает канал, если он не отправляет больше данных. В этом случае рабочий- beans не должен быть потоками деамонов, но может определять свой критерий завершения, основанный на закрытом и пустом канале ввода.