Spring Пакет: один читатель, несколько процессоров и писателей
В пакете Spring мне нужно передать элементы, прочитанные ItemReader, на два разных процессора и записи. То, что я пытаюсь достичь, это то, что...
+---> ItemProcessor#1 ---> ItemWriter#1
|
ItemReader ---> item ---+
|
+---> ItemProcessor#2 ---> ItemWriter#2
Это необходимо, потому что элементы, написанные ItemWriter # 1, должны обрабатываться совершенно по-другому, чем те, которые написаны ItemWriter # 2.
Кроме того, ItemReader считывает элемент из базы данных, и запросы, которые он выполняет, настолько дороги, что выполнение одного и того же запроса дважды должно быть отброшено.
Какой-нибудь намек на то, как достичь такой настройки? Или, по крайней мере, логически эквивалентная настройка?
Ответы
Ответ 1
это решение, с которым я пришел.
Итак, идея состоит в том, чтобы закодировать новый Writer, который "содержит" как ItemProcessor, так и ItemWriter. Чтобы дать вам представление, мы назвали его PreprocessoWriter и этот основной код.
private ItemWriter<O> writer;
private ItemProcessor<I, O> processor;
@Override
public void write(List<? extends I> items) throws Exception {
List<O> toWrite = new ArrayList<O>();
for (I item : items) {
toWrite.add(processor.process(item));
}
writer.write(toWrite);
}
Там осталось много вещей. Например, управление элементом ItemStream. Но в нашем конкретном сценарии этого было достаточно.
Итак, вы можете просто объединить несколько PreprocessorWriter с CompositeWriter.
Ответ 2
Это решение действительно, если ваш элемент должен обрабатываться процессором # 1 и процессором # 2
Вы должны создать процессор # 0 с этой сигнатурой:
class Processor0<Item, CompositeResultBean>
где CompositeResultBean
- bean, определяемый как
class CompositeResultBean {
Processor1ResultBean result1;
Processor2ResultBean result2;
}
В Processor0 просто делегируйте работу на процессоры # 1 и # 2 и поместите результат в CompositeResultBean
CompositeResultBean Processor0.process(Item item) {
final CompositeResultBean r = new CompositeResultBean();
r.setResult1(processor1.process(item));
r.setResult2(processor2.process(item));
return r;
}
Ваш собственный писатель является CompositeItemWriter
, который делегирует писателю CompositeResultBean.result
1 или CompositeResultBean.result2
(посмотрите PropertyExtractingDelegatingItemWriter, возможно может помочь)
Ответ 3
Вы можете использовать CompositeItemProcessor и CompositeItemWriter
Он не будет выглядеть точно так же, как ваша схема, он будет последовательным, но он выполнит эту работу.
Ответ 4
Я последовал за предложением Luca использовать PropertyExtractingDelegatingItemWriter как писатель, и я смог работать с двумя разными объектами за один шаг.
Прежде всего, я сделал определение DTO, в котором хранятся два объекта/результаты от процессора
public class DatabaseEntry {
private AccessLogEntry accessLogEntry;
private BlockedIp blockedIp;
public AccessLogEntry getAccessLogEntry() {
return accessLogEntry;
}
public void setAccessLogEntry(AccessLogEntry accessLogEntry) {
this.accessLogEntry = accessLogEntry;
}
public BlockedIp getBlockedIp() {
return blockedIp;
}
public void setBlockedIp(BlockedIp blockedIp) {
this.blockedIp = blockedIp;
}
}
то я передал этот DTO писателю, класс PropertyExtractingDelegatingItemWriter, где я определяю два настраиваемых метода для записи сущностей в базу данных, см. ниже код моего автора:
@Configuration
public class LogWriter extends LogAbstract {
@Autowired
private DataSource dataSource;
@Bean()
public PropertyExtractingDelegatingItemWriter<DatabaseEntry> itemWriterAccessLogEntry() {
PropertyExtractingDelegatingItemWriter<DatabaseEntry> propertyExtractingDelegatingItemWriter = new PropertyExtractingDelegatingItemWriter<DatabaseEntry>();
propertyExtractingDelegatingItemWriter.setFieldsUsedAsTargetMethodArguments(new String[]{"accessLogEntry", "blockedIp"});
propertyExtractingDelegatingItemWriter.setTargetObject(this);
propertyExtractingDelegatingItemWriter.setTargetMethod("saveTransaction");
return propertyExtractingDelegatingItemWriter;
}
public void saveTransaction(AccessLogEntry accessLogEntry, BlockedIp blockedIp) throws SQLException {
writeAccessLogTable(accessLogEntry);
if (blockedIp != null) {
writeBlockedIp(blockedIp);
}
}
private void writeBlockedIp(BlockedIp entry) throws SQLException {
PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO blocked_ips (ip,threshold,startDate,endDate,comment) VALUES (?,?,?,?,?)");
statement.setString(1, entry.getIp());
statement.setInt(2, threshold);
statement.setTimestamp(3, Timestamp.valueOf(startDate));
statement.setTimestamp(4, Timestamp.valueOf(endDate));
statement.setString(5, entry.getComment());
statement.execute();
}
private void writeAccessLogTable(AccessLogEntry entry) throws SQLException {
PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO log_entries (date,ip,request,status,userAgent) VALUES (?,?,?,?,?)");
statement.setTimestamp(1, Timestamp.valueOf(entry.getDate()));
statement.setString(2, entry.getIp());
statement.setString(3, entry.getRequest());
statement.setString(4, entry.getStatus());
statement.setString(5, entry.getUserAgent());
statement.execute();
}
}
с помощью этого подхода вы можете получить желаемое поведенческое поведение от одного считывателя для обработки нескольких объектов и сохранить их за один шаг
Ответ 5
Есть другое решение, если у вас есть разумное количество элементов (например, менее 1 Go): вы можете кэшировать результат своего выбора в коллекцию, завернутую в Spring bean.
Тогда u может просто прочитать коллекцию дважды без каких-либо затрат.