Как обрабатывать логически связанные строки после ItemReader в SpringBatch?
Сценарий
Чтобы сделать его простым, предположим, что у меня есть ItemReader, который возвращает мне 25 строк.
-
Первые 10 строк принадлежат ученику A
-
Следующие 5 принадлежат учащемуся B
-
а остальные 10 принадлежат учащемуся C
Я хочу объединить их вместе логически, скажем studentId и сгладить их , чтобы в итоге одна строка на каждого учащегося.
Проблема
Если я правильно понимаю, установка интервала фиксации на 5 сделает следующее:
- Отправьте 5 строк процессору (который будет агрегировать их или сделать любую бизнес-логику, о которой я расскажу).
- После обработки будет записано 5 строк.
- Затем он сделает это снова для следующих 5 строк и т.д.
Если это правда, то в течение следующих пяти я буду проверять уже написанные, вытащить их из совокупности в те, которые я сейчас обрабатываю, и писать их снова.
Я лично этого не делаю.
- Какова наилучшая практика для обработки такой ситуации в Spring Batch?
Alternative
Иногда я чувствую, что гораздо проще написать обычную основную программу Spring JDBC, а затем я полностью контролирую, что я хочу делать. Тем не менее, я хотел воспользоваться преимуществами мониторинга состояния работы хранилища данных, возможностью перезапуска, пропустить, работу и прослушиватели шагов....
Мой Spring пакетный код
Мой module-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
<description>Example job to get you started. It provides a skeleton for a typical batch application.</description>
<batch:job id="job1">
<batch:step id="step1" >
<batch:tasklet transaction-manager="transactionManager" start-limit="100" >
<batch:chunk reader="attendanceItemReader"
processor="attendanceProcessor"
writer="attendanceItemWriter"
commit-interval="10"
/>
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="attendanceItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader">
<property name="dataSource">
<ref bean="sourceDataSource"/>
</property>
<property name="sql"
value="select s.student_name ,s.student_id ,fas.attendance_days ,fas.attendance_value from K12INTEL_DW.ftbl_attendance_stumonabssum fas inner join k12intel_dw.dtbl_students s on fas.student_key = s.student_key inner join K12INTEL_DW.dtbl_schools ds on fas.school_key = ds.school_key inner join k12intel_dw.dtbl_school_dates dsd on fas.school_dates_key = dsd.school_dates_key where dsd.rolling_local_school_yr_number = 0 and ds.school_code = ? and s.student_activity_indicator = 'Active' and fas.LOCAL_GRADING_PERIOD = 'G1' and s.student_current_grade_level = 'Gr 9' order by s.student_id"/>
<property name="preparedStatementSetter" ref="attendanceStatementSetter"/>
<property name="rowMapper" ref="attendanceRowMapper"/>
</bean>
<bean id="attendanceStatementSetter" class="edu.kdc.visioncards.preparedstatements.AttendanceStatementSetter"/>
<bean id="attendanceRowMapper" class="edu.kdc.visioncards.rowmapper.AttendanceRowMapper"/>
<bean id="attendanceProcessor" class="edu.kdc.visioncards.AttendanceProcessor" />
<bean id="attendanceItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" value="file:target/outputs/passthrough.txt"/>
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator" />
</property>
</bean>
</beans>
Мои поддерживающие классы для Reader.
A PreparedStatementSetter
package edu.kdc.visioncards.preparedstatements;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.springframework.jdbc.core.PreparedStatementSetter;
public class AttendanceStatementSetter implements PreparedStatementSetter {
public void setValues(PreparedStatement ps) throws SQLException {
ps.setInt(1, 7);
}
}
и RowMapper
package edu.kdc.visioncards.rowmapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import edu.kdc.visioncards.dto.AttendanceDTO;
public class AttendanceRowMapper<T> implements RowMapper<AttendanceDTO> {
public static final String STUDENT_NAME = "STUDENT_NAME";
public static final String STUDENT_ID = "STUDENT_ID";
public static final String ATTENDANCE_DAYS = "ATTENDANCE_DAYS";
public static final String ATTENDANCE_VALUE = "ATTENDANCE_VALUE";
public AttendanceDTO mapRow(ResultSet rs, int rowNum) throws SQLException {
AttendanceDTO dto = new AttendanceDTO();
dto.setStudentId(rs.getString(STUDENT_ID));
dto.setStudentName(rs.getString(STUDENT_NAME));
dto.setAttDays(rs.getInt(ATTENDANCE_DAYS));
dto.setAttValue(rs.getInt(ATTENDANCE_VALUE));
return dto;
}
}
Мой процессор
package edu.kdc.visioncards;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.item.ItemProcessor;
import edu.kdc.visioncards.dto.AttendanceDTO;
public class AttendanceProcessor implements ItemProcessor<AttendanceDTO, Map<Integer, AttendanceDTO>> {
private Map<Integer, AttendanceDTO> map = new HashMap<Integer, AttendanceDTO>();
public Map<Integer, AttendanceDTO> process(AttendanceDTO dto) throws Exception {
if(map.containsKey(new Integer(dto.getStudentId()))){
AttendanceDTO attDto = (AttendanceDTO)map.get(new Integer(dto.getStudentId()));
attDto.setAttDays(attDto.getAttDays() + dto.getAttDays());
attDto.setAttValue(attDto.getAttValue() + dto.getAttValue());
}else{
map.put(new Integer(dto.getStudentId()), dto);
}
return map;
}
}
Мои проблемы из кода выше
В Процессе я создаю HashMap, и когда я обрабатываю строки, я проверяю, есть ли у меня этот Студент на Карте, если он там не добавлен. Если он уже там, я захватил его, получая значения, которые меня интересуют, и добавьте их в строку, которую я сейчас обрабатываю.
После этого Spring Batch Framework записывает файл в соответствии с моей конфигурацией
Мой вопрос таков:
- Я не хочу, чтобы он пошел к писателю. Я хочу обработать все остальные строки. Как сохранить эту Карту, которую я создал в памяти для следующего набора строк, которые нужно пройти через этот же Процессор? Каждый раз строка обрабатывается через AttendanceProcessor, и карта инициализируется. Должен ли я помещать инициализацию карты в статический блок?
Ответы
Ответ 1
Я всегда следую этому шаблону:
- Я делаю свой читательский охват "шагом", а в @PostConstruct я получаю
результаты и поместите их на карту.
- В процессоре я конвертирую связанный набор в список для записи,
и отправить список для записи
- В ItemWriter я сохраняю записываемый элемент в зависимости от случая
Ответ 2
В моем приложении я создал CollectingJdbcCursorItemReader
, который расширяет стандартный JdbcCursorItemReader
и выполняет именно то, что вам нужно. Внутри он использует мой CollectingRowMapper
: расширение стандартного RowMapper
, который сопоставляет несколько связанных строк с одним объектом.
Вот код ItemReader, код интерфейса CollectingRowMapper
и его абстрактная реализация, доступен в другом ответе.
p > import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.batch.item.ReaderNotOpenException;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.jdbc.core.RowMapper;
/**
* A JdbcCursorItemReader that uses a {@link CollectingRowMapper}.
* Like the superclass this reader is not thread-safe.
*
* @author Pino Navato
**/
public class CollectingJdbcCursorItemReader<T> extends JdbcCursorItemReader<T> {
private CollectingRowMapper<T> rowMapper;
private boolean firstRead = true;
/**
* Accepts a {@link CollectingRowMapper} only.
**/
@Override
public void setRowMapper(RowMapper<T> rowMapper) {
this.rowMapper = (CollectingRowMapper<T>)rowMapper;
super.setRowMapper(rowMapper);
}
/**
* Read next row and map it to item.
**/
@Override
protected T doRead() throws Exception {
if (rs == null) {
throw new ReaderNotOpenException("Reader must be open before it can be read.");
}
try {
if (firstRead) {
if (!rs.next()) { //Subsequent calls to next() will be executed by rowMapper
return null;
}
firstRead = false;
} else if (!rowMapper.hasNext()) {
return null;
}
T item = readCursor(rs, getCurrentItemCount());
return item;
}
catch (SQLException se) {
throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), se);
}
}
@Override
protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
T result = super.readCursor(rs, currentRow);
setCurrentItemCount(rs.getRow());
return result;
}
}
Вы можете использовать его так же, как классический JdbcCursorItemReader
: единственное требование - предоставить ему CollectingRowMapper
вместо классического RowMapper
.
Ответ 3
в основном вы говорите об пакетной обработке с меняющимися идентификаторами (1), где пакет должен отслеживать изменение
для spring/spring-batch, о котором мы говорим:
- ItemWriter, который проверяет список элементов для изменения идентификатора
- перед изменением элементы хранятся во временном хранилище данных (2) (список, карта, что угодно) и не записываются
- при изменении идентификатора бизнес-код агрегирования/выравнивания выполняется на элементах хранилища данных, и один элемент должен быть написан, теперь хранилище данных может использоваться для следующих элементов со следующим идентификатором
- Эта концепция нуждается в читателе, который сообщает шаг "Я исчерпан", чтобы правильно очистить временный хранилище данных по концам элементов (файл/база данных).
здесь пример грубого и простого кода
@Override
public void write(List<? extends SimpleItem> items) throws Exception {
// setup with first sharedId at startup
if (currentId == null){
currentId = items.get(0).getSharedId();
}
// check for change of sharedId in input
// keep items in temporary dataStore until id change of input
// call delegate if there is an id change or if the reader is exhausted
for (SimpleItem item : items) {
// already known sharedId, add to tempData
if (item.getSharedId() == currentId) {
tempData.add(item);
} else {
// or new sharedId, write tempData, empty it, keep new id
// the delegate does the flattening/aggregating
delegate.write(tempData);
tempData.clear();
currentId = item.getSharedId();
tempData.add(item);
}
}
// check if reader is exhausted, flush tempData
if ((Boolean) stepExecution.getExecutionContext().get("readerExhausted")
&& tempData.size() > 0) {
delegate.write(tempData);
// optional delegate.clear();
}
}
(1) при условии, что элементы упорядочены по идентификатору (также может быть составным)
(2) hashmap spring bean для безопасности потоков
Ответ 4
потому что вы изменили свой вопрос, я добавлю новый ответ
если ученики заказываются, тогда нет необходимости в списке/карте, вы можете использовать ровно один объект studentObject на процессоре, чтобы сохранить "текущий" и заполнить его, пока не появится новый (read: id change)
если ученики не заказаны, вы никогда не узнаете, когда конкретный ученик "закончен", и вам нужно будет держать всех учащихся на карте, которая не может быть записана до конца полной последовательности чтения
Остерегайтесь:
- процессор должен знать, когда читатель исчерпан
- его трудно заставить работать с любой концепцией фиксации и "id" , если вы объединяете элементы, которые так или иначе идентичны, процессор просто не может знать, является ли текущий обработанный элемент последним
- в основном, usecase либо полностью решается на уровне читателя, либо на уровне писателя (см. другой ответ).
private SimpleItem currentItem;
private StepExecution stepExecution;
@Override
public SimpleItem process(SimpleItem newItem) throws Exception {
SimpleItem returnItem = null;
if (currentItem == null) {
currentItem = new SimpleItem(newItem.getId(), newItem.getValue());
} else if (currentItem.getId() == newItem.getId()) {
// aggregate somehow
String value = currentItem.getValue() + newItem.getValue();
currentItem.setValue(value);
} else {
// "clone"/copy currentItem
returnItem = new SimpleItem(currentItem.getId(), currentItem.getValue());
// replace currentItem
currentItem = newItem;
}
// reader exhausted?
if(stepExecution.getExecutionContext().containsKey("readerExhausted")
&& (Boolean)stepExecution.getExecutionContext().get("readerExhausted")
&& currentItem.getId() == stepExecution.getExecutionContext().getInt("lastItemId")) {
returnItem = new SimpleItem(currentItem.getId(), currentItem.getValue());
}
return returnItem;
}
Ответ 5
Используйте "Слушатель выполнения шагов" и сохраните записи как карту в StepExecutionContext, затем вы можете сгруппировать их в прослушиватель сценариев или писателей и записать их за раз