Есть ли элегантный способ обработки потока в кусках?
Мой точный сценарий заключается в том, что вставляем данные в базу данных партиями, поэтому я хочу накапливать объекты DOM, а затем каждые 1000, очищать их.
Я реализовал его, поместив код в накопитель, чтобы обнаружить полноту, затем сбросить, но это кажется неправильным - управление потоком должно поступать от вызывающего.
Я мог бы преобразовать поток в список, а затем использовать subList в итеративном режиме, но это тоже кажется неуклюжим.
Есть ли там аккуратный способ предпринять действие для каждого n элементов, а затем продолжить поток, а только обрабатывать поток один раз?
Ответы
Ответ 1
Элегантность в глазах смотрящего. Если вы не возражаете использовать функцию состояния в groupingBy
, вы можете сделать это:
AtomicInteger counter = new AtomicInteger();
stream.collect(groupingBy(x->counter.getAndIncrement()/chunkSize))
.values()
.forEach(database::flushChunk);
Это не приведет к выигрышу в производительности или памяти за исходное решение, потому что оно все равно материализует весь поток, прежде чем что-либо делать.
Если вы хотите избежать материализации списка, stream API вам не поможет. Вам нужно будет получить итератор потока или spliterator и сделать что-то вроде этого:
Spliterator<Integer> split = stream.spliterator();
int chunkSize = 1000;
while(true) {
List<Integer> chunk = new ArrayList<>(size);
for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++){};
if (chunk.isEmpty()) break;
database.flushChunk(chunk);
}
Ответ 2
Использование библиотеки StreamEx решение будет выглядеть
Stream<Integer> stream = IntStream.iterate(0, i -> i + 1).boxed().limit(15);
AtomicInteger counter = new AtomicInteger(0);
int chunkSize = 4;
StreamEx.of(stream)
.groupRuns((prev, next) -> counter.incrementAndGet() % chunkSize != 0)
.forEach(chunk -> System.out.println(chunk));
Выход:
[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9, 10, 11]
[12, 13, 14]
groupRuns
принимает предикат, который решает, должны ли 2 элемента быть в одной группе.
Он создает группу, как только находит первый элемент, который ей не принадлежит.
Ответ 3
Если у вас есть зависимость от guava для вашего проекта, вы можете сделать это:
StreamSupport.stream(Iterables.partition(simpleList, 1000).spliterator(), false).forEach(...);
См https://google.github.io/guava/releases/23.0/api/docs/com/google/common/collect/Lists.html#partition-java.util.List-int-
Ответ 4
Вы можете создать поток чанков (List<T>
) из потока элементов и заданного размера чанка:
- группирование элементов по индексу чанка (индекс элемента/размер чанка)
- упорядочение кусков по их индексу
- сводя карту только к упорядоченным элементам
Код:
public static <T> Stream<List<T>> chunked(Stream<T> stream, int chunkSize) {
AtomicInteger index = new AtomicInteger(0);
return stream.collect(Collectors.groupingBy(x -> index.getAndIncrement() / chunkSize))
.entrySet().stream()
.sorted(Map.Entry.comparingByKey()).map(Map.Entry::getValue);
}
Пример использования:
Stream<Integer> stream = IntStream.range(0, 100).mapToObj(Integer::valueOf);
Stream<List<Integer>> chunked = chunked(stream, 8);
chunked.forEach(chunk -> System.out.println("Chunk: " + chunk));
Выход:
Chunk: [0, 1, 2, 3, 4, 5, 6, 7]
Chunk: [8, 9, 10, 11, 12, 13, 14, 15]
Chunk: [16, 17, 18, 19, 20, 21, 22, 23]
Chunk: [24, 25, 26, 27, 28, 29, 30, 31]
Chunk: [32, 33, 34, 35, 36, 37, 38, 39]
Chunk: [40, 41, 42, 43, 44, 45, 46, 47]
Chunk: [48, 49, 50, 51, 52, 53, 54, 55]
Chunk: [56, 57, 58, 59, 60, 61, 62, 63]
Chunk: [64, 65, 66, 67, 68, 69, 70, 71]
Chunk: [72, 73, 74, 75, 76, 77, 78, 79]
Chunk: [80, 81, 82, 83, 84, 85, 86, 87]
Chunk: [88, 89, 90, 91, 92, 93, 94, 95]
Chunk: [96, 97, 98, 99]
Ответ 5
Как правильно сказал Миша, Элегантность в глазах смотрящего. Я лично думаю, что элегантным решением было бы позволить классу, который вставляет в базу данных, выполнить эту задачу. Похоже на BufferedWriter
. Таким образом, он не зависит от вашей исходной структуры данных и может использоваться даже с несколькими потоками один за другим. Я не уверен, что это именно то, что вы имеете в виду, имея код в аккумуляторе, который вы считаете неправильным. Я не думаю, что это неправильно, так как существующие классы, такие как BufferedWriter
работают таким образом. Таким образом, у вас есть некоторый контроль сброса вызывающей стороны, вызывая flush()
на модуле записи в любой момент.
Что-то вроде следующего кода.
class BufferedDatabaseWriter implements Flushable {
List<DomObject> buffer = new LinkedList<DomObject>();
public void write(DomObject o) {
buffer.add(o);
if(buffer.length > 1000)
flush();
}
public void flush() {
//write buffer to database and clear it
}
}
Теперь ваш поток обрабатывается так:
BufferedDatabaseWriter writer = new BufferedDatabaseWriter();
stream.forEach(o -> writer.write(o));
//if you have more streams stream2.forEach(o -> writer.write(o));
writer.flush();
Если вы хотите работать в многопоточном режиме, вы можете запустить асинхронный сброс. Взятие из потока не может идти параллельно, но я не думаю, что есть способ посчитать 1000 элементов из потока параллельно.
Вы также можете расширить AutoCloseable
чтобы разрешить установку размера буфера в конструкторе, или сделать так, чтобы он реализовал AutoCloseable
и запустил его, попробовав ресурсы и многое другое. Хорошие вещи, которые вы получили от BufferedWriter
.
Ответ 6
Похоже, нет, потому что создание кусков означает сокращение потока, а уменьшение - завершение. Если вам необходимо поддерживать природу потоков и чанки процессов, не собирая все данные, прежде чем вот мой код (не работает для параллельных потоков):
private static <T> BinaryOperator<List<T>> processChunks(Consumer<List<T>> consumer, int chunkSize) {
return (data, element) -> {
if (data.size() < chunkSize) {
data.addAll(element);
return data;
} else {
consumer.accept(data);
return element; // in fact it new data list
}
};
}
private static <T> Function<T, List<T>> createList(int chunkSize) {
AtomicInteger limiter = new AtomicInteger(0);
return element -> {
limiter.incrementAndGet();
if (limiter.get() == 1) {
ArrayList<T> list = new ArrayList<>(chunkSize);
list.add(element);
return list;
} else if (limiter.get() == chunkSize) {
limiter.set(0);
}
return Collections.singletonList(element);
};
}
и как использовать
Consumer<List<Integer>> chunkProcessor = (list) -> list.forEach(System.out::println);
int chunkSize = 3;
Stream.generate(StrTokenizer::getInt).limit(13)
.map(createList(chunkSize))
.reduce(processChunks(chunkProcessor, chunkSize))
.ifPresent(chunkProcessor);
static Integer i = 0;
static Integer getInt()
{
System.out.println("next");
return i++;
}
он напечатает
следующий следующий следующий следующий 0 1 2 следующий следующий следующий 3 4 5 следующий следующий следующий 6 7 8 следующий следующий следующий 9 10 11 12
Идея состоит в том, чтобы создавать списки в операции карты с "шаблоном"
[1,],[2],[3],[4,]...
и объединить (+process), что с уменьшением.
[1,2,3],[4,5,6],...
и не забудьте обработать последний "обрезанный" кусок с помощью
.ifPresent(chunkProcessor);