Разделить поток Java 8
Как реализовать операцию "раздела" на Java 8 Stream? Под разделом я имею в виду разделить поток на подпотоки заданного размера. Как-то он будет идентичен методу Guava Iterators.partition(), желательно, чтобы разделы были лениво оценены потоками, а не списком.
Ответы
Ответ 1
Невозможно разделить произвольный поток источника на партии фиксированного размера, поскольку это приведет к нарушению параллельной обработки. При параллельной обработке вы можете не знать, сколько элементов в первой подзадаче после разделения, поэтому вы не можете создавать разделы для следующей подзадачи до тех пор, пока первая не будет полностью обработана.
Однако можно создать поток разделов из произвольного доступа List
. Такая функция доступна, например, в моей библиотеке StreamEx
:
List<Type> input = Arrays.asList(...);
Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize);
Или, если вы действительно хотите поток потоков:
Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream);
Если вы не хотите зависеть от сторонних библиотек, вы можете реализовать такой метод ofSubLists
вручную:
public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) {
if (length <= 0)
throw new IllegalArgumentException("length = " + length);
int size = source.size();
if (size <= 0)
return Stream.empty();
int fullChunks = (size - 1) / length;
return IntStream.range(0, fullChunks + 1).mapToObj(
n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
}
Эта реализация выглядит немного длиннее, но она учитывает некоторые угловые случаи, такие как размер списка близких к MAX_VALUE.
Если вы хотите использовать параллельное решение для неупорядоченного потока (так что вам все равно, какие элементы потока будут объединены в одну партию), вы можете использовать коллекционер, подобный этому (благодаря @sibnick для вдохновения):
public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize,
Collector<List<T>, A, R> downstream) {
class Acc {
List<T> cur = new ArrayList<>();
A acc = downstream.supplier().get();
}
BiConsumer<Acc, T> accumulator = (acc, t) -> {
acc.cur.add(t);
if(acc.cur.size() == batchSize) {
downstream.accumulator().accept(acc.acc, acc.cur);
acc.cur = new ArrayList<>();
}
};
return Collector.of(Acc::new, accumulator,
(acc1, acc2) -> {
acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc);
for(T t : acc2.cur) accumulator.accept(acc1, t);
return acc1;
}, acc -> {
if(!acc.cur.isEmpty())
downstream.accumulator().accept(acc.acc, acc.cur);
return downstream.finisher().apply(acc.acc);
}, Collector.Characteristics.UNORDERED);
}
Пример использования:
List<List<Integer>> list = IntStream.range(0,20)
.boxed().parallel()
.collect(unorderedBatches(3, Collectors.toList()));
Результат:
[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]]
Такой сборщик отлично потокобезопасен и производит упорядоченные партии для последовательного потока.
Если вы хотите применить промежуточное преобразование для каждой партии, вы можете использовать следующую версию:
public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize,
Collector<T, AA, B> batchCollector,
Collector<B, A, R> downstream) {
return unorderedBatches(batchSize,
Collectors.mapping(list -> list.stream().collect(batchCollector), downstream));
}
Например, таким образом вы можете суммировать числа в каждой партии "на лету":
List<Integer> list = IntStream.range(0,20)
.boxed().parallel()
.collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue),
Collectors.toList()));
Ответ 2
При условии, что вы хотите использовать поток последовательно, можно разделить поток (а также выполнять связанные функции, такие как управление окнами - что, я думаю, именно то, что вам действительно нужно в этом случае). Две библиотеки, которые будут поддерживать разделение для стандартных потоков, - это циклоп-реагировать (я автор) и jOOλ, который расширяет циклоп-реагирование (для добавления функциональности, такой как Windowing).
У cyclops-streams есть набор статических функций StreamUtils для работы на Java Streams и ряд функций, таких как splitAt, headAndTail, splitBy, partition для разбиения.
Чтобы вывести окно потока в поток вложенных потоков размером 30, вы можете использовать метод окна.
С точки зрения OP, в терминах Streaming разделение потока на несколько потоков заданного размера является операцией Windowing (а не операцией Partitioning).
Stream<Streamable<Integer>> streamOfStreams = StreamUtils.window(stream,30);
Существует класс расширения Stream под названием ReactiveSeq, который расширяет jool.Seq и добавляет функциональность Windowing, которая может сделать код немного чище.
ReactiveSeq<Integer> seq;
ReactiveSeq<ListX<Integer>> streamOfLists = seq.grouped(30);
Как отмечает Тагир выше, это не подходит для параллельных потоков. Если вы хотите создать окно или пакетный поток, который вы хотите выполнить многопоточным способом. LazyFutureStream в циклоп-реакции может быть полезен (Windowing находится в списке дел, но в настоящее время доступно обычное старое пакетирование).
В этом случае данные будут передаваться из нескольких потоков, выполняющих поток, в очередь без ожидания для нескольких производителей/одного потребителя, и последовательные данные из этой очереди могут быть обработаны окнами перед тем, как снова распределены потокам.
Stream<List<Data>> batched = new LazyReact().range(0,1000)
.grouped(30)
.map(this::process);
Ответ 3
Похоже, как показал Джон Скит в своем comment, невозможно сделать разделы ленивыми. Для нелазных разделов у меня уже есть этот код:
public static <T> Stream<Stream<T>> partition(Stream<T> source, int size) {
final Iterator<T> it = source.iterator();
final Iterator<Stream<T>> partIt = Iterators.transform(Iterators.partition(it, size), List::stream);
final Iterable<Stream<T>> iterable = () -> partIt;
return StreamSupport.stream(iterable.spliterator(), false);
}
Ответ 4
Я нашел элегантное решение: Iterable parts = Iterables.partition(stream::iterator, size)
Ответ 5
Самое элегантное и чистое решение java 8 для этой проблемы я нашел:
public static <T> List<List<T>> partition(final List<T> list, int batchSize) {
return IntStream.range(0, getNumberOfPartitions(list, batchSize))
.mapToObj(i -> list.subList(i * batchSize, Math.min((i + 1) * batchSize, list.size())))
.collect(toList());
}
//https://stackoverflow.com/questions/23246983/get-the-next-higher-integer-value-in-java
private static <T> int getNumberOfPartitions(List<T> list, int batchSize) {
return (list.size() + batchSize- 1) / batchSize;
}
Ответ 6
Это чистое решение Java, которое оценивалось лениво вместо использования List.
public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable
currentBatch.add(new ArrayList<T>(batchSize));
return Stream.concat(stream
.sequential()
.map(new Function<T, List<T>>(){
public List<T> apply(T t){
currentBatch.get(0).add(t);
return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
}
}), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
.limit(1)
).filter(Objects::nonNull);
}
Метод возвращает Stream<List<T>>
для гибкости. Вы можете легко конвертировать его в Stream<Stream<T>>
через partition(something, 10).map(List::stream)
.
Ответ 7
Я думаю, что это возможно с каким-то взломом внутри:
создать класс утилиты для пакета:
public static class ConcurrentBatch {
private AtomicLong id = new AtomicLong();
private int batchSize;
public ConcurrentBatch(int batchSize) {
this.batchSize = batchSize;
}
public long next() {
return (id.getAndIncrement()) / batchSize;
}
public int getBatchSize() {
return batchSize;
}
}
и метод:
public static <T> void applyConcurrentBatchToStream(Consumer<List<T>> batchFunc, Stream<T> stream, int batchSize){
ConcurrentBatch batch = new ConcurrentBatch(batchSize);
//hack java map: extends and override computeIfAbsent
Supplier<ConcurrentMap<Long, List<T>>> mapFactory = () -> new ConcurrentHashMap<Long, List<T>>() {
@Override
public List<T> computeIfAbsent(Long key, Function<? super Long, ? extends List<T>> mappingFunction) {
List<T> rs = super.computeIfAbsent(key, mappingFunction);
//apply batchFunc to old lists, when new batch list is created
if(rs.isEmpty()){
for(Entry<Long, List<T>> e : entrySet()) {
List<T> batchList = e.getValue();
//todo: need to improve
synchronized (batchList) {
if (batchList.size() == batch.getBatchSize()){
batchFunc.accept(batchList);
remove(e.getKey());
batchList.clear();
}
}
}
}
return rs;
}
};
stream.map(s -> new AbstractMap.SimpleEntry<>(batch.next(), s))
.collect(groupingByConcurrent(AbstractMap.SimpleEntry::getKey, mapFactory, mapping(AbstractMap.SimpleEntry::getValue, toList())))
.entrySet()
.stream()
//map contains only unprocessed lists (size<batchSize)
.forEach(e -> batchFunc.accept(e.getValue()));
}
Ответ 8
Вот быстрое решение от AbacusUtil
IntStream.range(0, Integer.MAX_VALUE).split(size).forEach(s -> N.println(s.toArray()));
Отказ от ответственности: я разработчик AbacusUtil.