Spring пакетное задание Чтение из нескольких источников
Как я могу читать элементы из баз данных с множественными значениями? Я уже знаю, что это возможно из файлов.
следующий пример работает для чтения из многократных файлов
...
<job id="readMultiFileJob" xmlns="http://www.springframework.org/schema/batch">
<step id="step1">
<tasklet>
<chunk reader="multiResourceReader" writer="flatFileItemWriter"
commit-interval="1" />
</tasklet>
</step>
</job>
...
<bean id="multiResourceReader"
class=" org.springframework.batch.item.file.MultiResourceItemReader">
<property name="resources" value="file:csv/inputs/domain-*.csv" />
<property name="delegate" ref="flatFileItemReader" />
</bean>
...
три beans, как это.
<bean id="database2" class="org.springframework.batch.item.database.JdbcCursorItemReader">
<property name="name" value="database2Reader" />
<property name="dataSource" ref="dataSource2" />
<property name="sql" value="select image from object where image like '%/images/%'" />
<property name="rowMapper">
<bean class="sym.batch.ImagesRowMapper2" />
</property>
</bean>
Ответы
Ответ 1
Нет готового к использованию компонента, который выполняет то, что вы просите; единственное решение - написать пользовательский ItemReader<>
, который делегирует JdbcCursorItemReader
(или HibernateCursorItemReader
или любую универсальную реализацию ItemReader
).
Вам необходимо подготовить все необходимые материалы (источник данных, сеанс, настоящие читатели базы данных) и связать всех делегированных читателей с вашим пользовательским читателем.
EDIT:
Вам нужно смоделировать цикл, используя рекурсию ItemReader.read()
, и считыватель mantain и делегирует состояние через перезагрузки задания.
class MyItemReader<T> implements ItemReader<T>, ItemStream {
private ItemReader[] delegates;
private int delegateIndex;
private ItemReader<T> currentDelegate;
private ExecutionContext stepExecutionContext;
public void setDelegates(ItemReader[] delegates) {
this.delegates = delegates;
}
@BeforeStep
private void beforeStep(StepExecution stepExecution) {
this.stepExecutionContext = stepExecution.getExecutionContext();
}
public T read() {
T item = null;
if(null != currentDelegate) {
item = currentDelegate.read();
if(null == item) {
((ItemStream)this.currentDelegate).close();
this.currentDelegate = null;
}
}
// Move to next delegate if previous was exhausted!
if(null == item && this.delegateIndex< this.delegates.length) {
this.currentDelegate = this.delegates[this.currentIndex++];
((ItemStream)this.currentDelegate).open(this.stepExecutionContext);
update(this.stepExecutionContext);
// Recurse to read() to simulate loop through delegates
item = read();
}
return item;
}
public void open(ExecutionContext ctx) {
// During open restore last active reader and restore its state
if(ctx.containsKey("index")) {
this.delegateIndex = ctx.getInt("index");
this.currentDelegate = this.delegates[this.delegateIndex];
((ItemStream)this.currentDelegate ).open(ctx);
}
}
public void update(ExecutionContext ctx) {
// Update current delegate index and state
ctx.putInt("index", this.delegateIndex);
if(null != this.currentDelegate) {
((ItemStream)this.currentDelegate).update(ctx);
}
}
public void close(ExecutionContext ctx) {
if(null != this.currentDelegate) {
((ItemStream)this.currentDelegate).close();
}
}
<bean id="myItemReader" class=path.to.MyItemReader>
<property name="delegates">
<array>
<ref bean="itemReader1"/>
<ref bean="itemReader2"/>
<ref bean="itemReader3"/>
</array>
</property>
</bean>
EDIT2: Не забудьте установить свойство name; это НЕОБХОДИМО, чтобы MyItemReader.read() работал правильно
<bean id="itemReader1" class="JdbcCursorItemReader">
<property name="name" value="itemReader1" />
<!-- Set other properties -->
</bean>
Ответ 2
Я предлагаю простой обходной путь, который может не подходить для всех случаев, но будет полезен во многих случаях:
Просто определите:
- 2 считывателя, по одному для каждой базы данных
- 2 шага
- одно задание, содержащее два шага
Два шага почти идентичны, они ссылаются на один и тот же процессор и писатель, но у них разные читатели. Они будут называться последовательно.
Будет ли эта настройка работать, будет зависеть от процессора и записывающего устройства (независимо от того, работают ли они по-разному, когда вызывается на разных этапах). В моем случае достаточно было установить appendAllowed=true
для записи, так что оба действия могут записываться в один и тот же файл.
Ответ 3
Я предлагаю сложный путь. Если мы предположим, что одна ваша таблица данных datasource mysql является базовой, и каждая строка в этой таблице соответствует другой строке таблицы данных datasource mysql (например, таблицы объединений, которые находятся в разных источниках данных), вы можете сделать это в своем рабочем элементе обработки заданий. Пример этого пути;
Spring Конфигурация DataSource;
<bean id="mySqlDataSource1" class="org.apache.commons.dbcp.BasicDataSource">
<property name="driverClassName" value="${database1.driverClassName}"/>
<property name="url" value="${database1.url}"/>
<property name="username" value="${database1.username}"/>
<property name="password" value="${database1.password}"/>
<property name="validationQuery" value="${database1.validationQuery}"/>
</bean>
<bean id="mySqlDataSource2" class="org.apache.commons.dbcp.BasicDataSource">
<property name="driverClassName" value="${database2.driverClassName}"/>
<property name="url" value="${database2.url}"/>
<property name="username" value="${database2.username}"/>
<property name="password" value="${database2.password}"/>
<property name="validationQuery" value="${database2.validationQuery}"/>
</bean>
Ваш пакет-job.xml
<bean id="multiDatasorceReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
<property name="dataSource" ref="mySqlDataSource1" />
<property name="rowMapper" ref="multiDatasourceRowMapper" />
<property name="sql">
<value>
SELECT * FROM xyz
</value>
</property>
</bean>
<bean id="multiDatasourceRowMapper" class="yourpackage.MultiDatasourceRowMapper" scope="step">
<property name="secondDataSource" ref="mySqlDataSource2" />
<property name="secondSql">
<value>
SELECT * FROM abc
</value>
</property>
</bean>
Ваш RowMapper выглядит следующим образом:
public class MultiDatasourceRowMapper implements RowMapper<String> {
private DataSource secondDataSource;
private String secondSql;
public String mapRow(ResultSet rs, int arg1) throws SQLException {
Connection conn = secondDataSource.getConnection();
PreparedStatement prep = conn.prepareStatement(secondSql);
// Do Something
return "";
}
public void setSecondDataSource(DataSource secondDataSource) {
this.secondDataSource = secondDataSource;
}
public void setSecondSql(String secondSql) {
this.secondSql = secondSql;
}
}