Как обрабатывать логически связанные строки после ItemReader в SpringBatch?

Сценарий

Чтобы сделать его простым, предположим, что у меня есть ItemReader, который возвращает мне 25 строк.

  • Первые 10 строк принадлежат ученику A

  • Следующие 5 принадлежат учащемуся B

  • а остальные 10 принадлежат учащемуся C

Я хочу объединить их вместе логически, скажем studentId и сгладить их , чтобы в итоге одна строка на каждого учащегося.

Проблема

Если я правильно понимаю, установка интервала фиксации на 5 сделает следующее:

  • Отправьте 5 строк процессору (который будет агрегировать их или сделать любую бизнес-логику, о которой я расскажу).
  • После обработки будет записано 5 строк.
  • Затем он сделает это снова для следующих 5 строк и т.д.

Если это правда, то в течение следующих пяти я буду проверять уже написанные, вытащить их из совокупности в те, которые я сейчас обрабатываю, и писать их снова.

Я лично этого не делаю.

  • Какова наилучшая практика для обработки такой ситуации в Spring Batch?

Alternative

Иногда я чувствую, что гораздо проще написать обычную основную программу Spring JDBC, а затем я полностью контролирую, что я хочу делать. Тем не менее, я хотел воспользоваться преимуществами мониторинга состояния работы хранилища данных, возможностью перезапуска, пропустить, работу и прослушиватели шагов....

Мой Spring пакетный код

Мой module-context.xml

   <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

    <description>Example job to get you started. It provides a skeleton for a typical batch application.</description>

    <batch:job id="job1">
        <batch:step id="step1"  >           
            <batch:tasklet transaction-manager="transactionManager" start-limit="100" >             
                 <batch:chunk reader="attendanceItemReader"
                              processor="attendanceProcessor" 
                              writer="attendanceItemWriter" 
                              commit-interval="10" 
                 />

            </batch:tasklet>
        </batch:step>
    </batch:job> 

    <bean id="attendanceItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader"> 
        <property name="dataSource">
            <ref bean="sourceDataSource"/>
        </property> 
        <property name="sql"                                                    
                  value="select s.student_name ,s.student_id ,fas.attendance_days ,fas.attendance_value from K12INTEL_DW.ftbl_attendance_stumonabssum fas inner join k12intel_dw.dtbl_students s on fas.student_key = s.student_key inner join K12INTEL_DW.dtbl_schools ds on fas.school_key = ds.school_key inner join k12intel_dw.dtbl_school_dates dsd on fas.school_dates_key = dsd.school_dates_key where dsd.rolling_local_school_yr_number = 0 and ds.school_code = ? and s.student_activity_indicator = 'Active' and fas.LOCAL_GRADING_PERIOD = 'G1' and s.student_current_grade_level = 'Gr 9' order by s.student_id"/>
        <property name="preparedStatementSetter" ref="attendanceStatementSetter"/>           
        <property name="rowMapper" ref="attendanceRowMapper"/> 
    </bean> 

    <bean id="attendanceStatementSetter" class="edu.kdc.visioncards.preparedstatements.AttendanceStatementSetter"/>

    <bean id="attendanceRowMapper" class="edu.kdc.visioncards.rowmapper.AttendanceRowMapper"/>

    <bean id="attendanceProcessor" class="edu.kdc.visioncards.AttendanceProcessor" />  

    <bean id="attendanceItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter"> 
        <property name="resource" value="file:target/outputs/passthrough.txt"/> 
        <property name="lineAggregator"> 
            <bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator" /> 
        </property> 
    </bean> 

</beans>

Мои поддерживающие классы для Reader.

A PreparedStatementSetter

package edu.kdc.visioncards.preparedstatements;

import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.springframework.jdbc.core.PreparedStatementSetter;

public class AttendanceStatementSetter implements PreparedStatementSetter {

    public void setValues(PreparedStatement ps) throws SQLException {

        ps.setInt(1, 7);

    }

}

и RowMapper

package edu.kdc.visioncards.rowmapper;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

import edu.kdc.visioncards.dto.AttendanceDTO;

public class AttendanceRowMapper<T> implements RowMapper<AttendanceDTO> {

    public static final String STUDENT_NAME = "STUDENT_NAME";
    public static final String STUDENT_ID = "STUDENT_ID";
    public static final String ATTENDANCE_DAYS = "ATTENDANCE_DAYS";
    public static final String ATTENDANCE_VALUE = "ATTENDANCE_VALUE";

    public AttendanceDTO mapRow(ResultSet rs, int rowNum) throws SQLException {

        AttendanceDTO dto = new AttendanceDTO();
        dto.setStudentId(rs.getString(STUDENT_ID));
        dto.setStudentName(rs.getString(STUDENT_NAME));
        dto.setAttDays(rs.getInt(ATTENDANCE_DAYS));
        dto.setAttValue(rs.getInt(ATTENDANCE_VALUE));

        return dto;
    }
}

Мой процессор

package edu.kdc.visioncards;

import java.util.HashMap;
import java.util.Map;

import org.springframework.batch.item.ItemProcessor;

import edu.kdc.visioncards.dto.AttendanceDTO;

public class AttendanceProcessor implements ItemProcessor<AttendanceDTO, Map<Integer, AttendanceDTO>> {

    private Map<Integer, AttendanceDTO> map = new HashMap<Integer, AttendanceDTO>();

    public Map<Integer, AttendanceDTO> process(AttendanceDTO dto) throws Exception {

        if(map.containsKey(new Integer(dto.getStudentId()))){

            AttendanceDTO attDto = (AttendanceDTO)map.get(new Integer(dto.getStudentId()));
            attDto.setAttDays(attDto.getAttDays() + dto.getAttDays());
            attDto.setAttValue(attDto.getAttValue() + dto.getAttValue());

        }else{
            map.put(new Integer(dto.getStudentId()), dto);
        }
        return map;
    }

}

Мои проблемы из кода выше

В Процессе я создаю HashMap, и когда я обрабатываю строки, я проверяю, есть ли у меня этот Студент на Карте, если он там не добавлен. Если он уже там, я захватил его, получая значения, которые меня интересуют, и добавьте их в строку, которую я сейчас обрабатываю.

После этого Spring Batch Framework записывает файл в соответствии с моей конфигурацией

Мой вопрос таков:

  • Я не хочу, чтобы он пошел к писателю. Я хочу обработать все остальные строки. Как сохранить эту Карту, которую я создал в памяти для следующего набора строк, которые нужно пройти через этот же Процессор? Каждый раз строка обрабатывается через AttendanceProcessor, и карта инициализируется. Должен ли я помещать инициализацию карты в статический блок?

Ответы

Ответ 1

Я всегда следую этому шаблону:

  • Я делаю свой читательский охват "шагом", а в @PostConstruct я получаю результаты и поместите их на карту.
  • В процессоре я конвертирую связанный набор в список для записи, и отправить список для записи
  • В ItemWriter я сохраняю записываемый элемент в зависимости от случая

Ответ 2

В моем приложении я создал CollectingJdbcCursorItemReader, который расширяет стандартный JdbcCursorItemReader и выполняет именно то, что вам нужно. Внутри он использует мой CollectingRowMapper: расширение стандартного RowMapper, который сопоставляет несколько связанных строк с одним объектом.

Вот код ItemReader, код интерфейса CollectingRowMapper и его абстрактная реализация, доступен в другом ответе.

p >
import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.batch.item.ReaderNotOpenException;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.jdbc.core.RowMapper;

/**
 * A JdbcCursorItemReader that uses a {@link CollectingRowMapper}.
 * Like the superclass this reader is not thread-safe.
 * 
 * @author Pino Navato
 **/
public class CollectingJdbcCursorItemReader<T> extends JdbcCursorItemReader<T> {

    private CollectingRowMapper<T> rowMapper;
    private boolean firstRead = true;


    /**
     * Accepts a {@link CollectingRowMapper} only.
     **/
    @Override
    public void setRowMapper(RowMapper<T> rowMapper) {
        this.rowMapper = (CollectingRowMapper<T>)rowMapper;
        super.setRowMapper(rowMapper);
     }


    /**
     * Read next row and map it to item.
     **/
    @Override
    protected T doRead() throws Exception {
        if (rs == null) {
            throw new ReaderNotOpenException("Reader must be open before it can be read.");
        }

        try {
            if (firstRead) {
                if (!rs.next()) {  //Subsequent calls to next() will be executed by rowMapper
                    return null;
                }
                firstRead = false;
            } else if (!rowMapper.hasNext()) {
                return null;
            }
            T item = readCursor(rs, getCurrentItemCount());
            return item;
        }
        catch (SQLException se) {
            throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), se);
        }
    }

    @Override
    protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
        T result = super.readCursor(rs, currentRow);
        setCurrentItemCount(rs.getRow());
        return result;
    }

}

Вы можете использовать его так же, как классический JdbcCursorItemReader: единственное требование - предоставить ему CollectingRowMapper вместо классического RowMapper.

Ответ 3

в основном вы говорите об пакетной обработке с меняющимися идентификаторами (1), где пакет должен отслеживать изменение

для spring/spring-batch, о котором мы говорим:

  • ItemWriter, который проверяет список элементов для изменения идентификатора
  • перед изменением элементы хранятся во временном хранилище данных (2) (список, карта, что угодно) и не записываются
  • при изменении идентификатора бизнес-код агрегирования/выравнивания выполняется на элементах хранилища данных, и один элемент должен быть написан, теперь хранилище данных может использоваться для следующих элементов со следующим идентификатором
  • Эта концепция нуждается в читателе, который сообщает шаг "Я исчерпан", чтобы правильно очистить временный хранилище данных по концам элементов (файл/база данных).

здесь пример грубого и простого кода

@Override
public void write(List<? extends SimpleItem> items) throws Exception {

    // setup with first sharedId at startup
    if (currentId == null){
        currentId = items.get(0).getSharedId();
    }

    // check for change of sharedId in input
    // keep items in temporary dataStore until id change of input
    // call delegate if there is an id change or if the reader is exhausted
    for (SimpleItem item : items) {
        // already known sharedId, add to tempData
        if (item.getSharedId() == currentId) {
            tempData.add(item);
        } else {
            // or new sharedId, write tempData, empty it, keep new id
            // the delegate does the flattening/aggregating
            delegate.write(tempData);
            tempData.clear();
            currentId = item.getSharedId();
            tempData.add(item);
        }
    }

    // check if reader is exhausted, flush tempData
    if ((Boolean) stepExecution.getExecutionContext().get("readerExhausted")
            && tempData.size() > 0) {
        delegate.write(tempData);
        // optional delegate.clear(); 
    }
}

(1) при условии, что элементы упорядочены по идентификатору (также может быть составным)

(2) hashmap spring bean для безопасности потоков

Ответ 4

потому что вы изменили свой вопрос, я добавлю новый ответ

если ученики заказываются, тогда нет необходимости в списке/карте, вы можете использовать ровно один объект studentObject на процессоре, чтобы сохранить "текущий" и заполнить его, пока не появится новый (read: id change)

если ученики не заказаны, вы никогда не узнаете, когда конкретный ученик "закончен", и вам нужно будет держать всех учащихся на карте, которая не может быть записана до конца полной последовательности чтения

Остерегайтесь:

  • процессор должен знать, когда читатель исчерпан
  • его трудно заставить работать с любой концепцией фиксации и "id" , если вы объединяете элементы, которые так или иначе идентичны, процессор просто не может знать, является ли текущий обработанный элемент последним
  • в основном, usecase либо полностью решается на уровне читателя, либо на уровне писателя (см. другой ответ).
private SimpleItem currentItem;
private StepExecution stepExecution;

@Override
public SimpleItem process(SimpleItem newItem) throws Exception {
    SimpleItem returnItem = null;

    if (currentItem == null) {
        currentItem = new SimpleItem(newItem.getId(), newItem.getValue());
    } else if (currentItem.getId() == newItem.getId()) {
        // aggregate somehow
        String value = currentItem.getValue() + newItem.getValue();
        currentItem.setValue(value);
    } else {
        // "clone"/copy currentItem
        returnItem = new SimpleItem(currentItem.getId(), currentItem.getValue());
        // replace currentItem
        currentItem = newItem;
    }

    // reader exhausted?
    if(stepExecution.getExecutionContext().containsKey("readerExhausted")
            && (Boolean)stepExecution.getExecutionContext().get("readerExhausted")
            && currentItem.getId() == stepExecution.getExecutionContext().getInt("lastItemId")) {
        returnItem = new SimpleItem(currentItem.getId(), currentItem.getValue());
    }

    return returnItem;
}

Ответ 5

Используйте "Слушатель выполнения шагов" и сохраните записи как карту в StepExecutionContext, затем вы можете сгруппировать их в прослушиватель сценариев или писателей и записать их за раз