Java flatmap Iterator <Pair <Stream <A>, Stream <B>>> to Pair <Stream <A>, Stream <B>>
Я пытаюсь реализовать метод со следующей сигнатурой:
public static <A,B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>>> iterator);
Если целью метода является сглаживание каждого из типов потоков в один поток и завершение вывода в пару. У меня есть только Iterator (не Iterable), и я не могу изменить подпись метода, поэтому мне нужно выполнить сглаживание в одной итерации.
Моя самая лучшая реализация -
public static <A,B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>> iterator) {
Stream<A> aStream = Stream.empty();
Stream<B> bStream = Stream.empty();
while(iterator.hasNext()) {
Pair<Stream<A>, Stream<B>> elm = iterator.next();
aStream = Stream.concat(aStream, elm.first);
bStream = Stream.concat(bStream, elm.second);
}
return Pair.of(aStream, bStream);
}
Но пока это технически правильно, я не очень доволен этим по двум причинам:
Кажется, что Stream # flatMap должен быть здесь подходящим (после преобразования входного Iterator в Stream с использованием Guava Streams.stream(Iterator), но, похоже, он не работает из-за типа пары в середине.
Еще одно требование состоит в том, что любой из итераторов/потоков может быть очень большим (вход может содержать где угодно от одной пары чрезвычайно больших потоков ко многим потокам одного элемента, например), поэтому решения в идеале не должны содержать сбор результаты в коллекции в памяти.
Ответы
Ответ 1
Ну, гуава Streams.stream
- это не волшебство, а это действительно внутренне:
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false);
Поэтому, возможно, нет необходимости связывать это с вашим методом, пока вы можете использовать его напрямую.
И вы можете использовать Stream.Builder
только для этого:
public static <A, B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>>> iterator) {
Stream.Builder<Stream<A>> builderA = Stream.builder();
Stream.Builder<Stream<B>> builderB = Stream.builder();
iterator.forEachRemaining(pair -> {
builderA.add(pair.first);
builderB.add(pair.second);
});
return Pair.of(builderA.build().flatMap(Function.identity()), builderB.build().flatMap(Function.identity()));
}
Ответ 2
Избегать сбора всего Iterator
(как вы на самом деле делаете в вопросе) довольно сложно, так как вы не знаете, как будут обрабатываться результирующие потоки: можно полностью уничтожить, требуя, чтобы итератор полностью потреблял ну, в то время как другой не потребляется вообще, требуя отслеживать все произведенные пары - эффективно собирать их где-то.
Только если потоки будут потребляться более или менее на "скорости", вы можете извлечь выгоду из того, что не собираете весь итератор. Но такое потребление подразумевает либо работу с итератором одного из результирующих потоков, либо потребление потоков в параллельных потоках, что потребует дополнительной синхронизации.
Поэтому я предлагаю собрать все пары в List
вместо этого, а затем сгенерировать новый Pair
из этого списка:
public static <A,B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>>> iterator) {
Iterable<Pair<Stream<A>, Stream<B>>> iterable = () -> iterator;
final List<Pair<Stream<A>, Stream<B>>> allPairs =
StreamSupport.stream(iterable.spliterator(), false)
.collect(Collectors.toList());
return Pair.of(
allPairs.stream().flatMap(p -> p.first),
allPairs.stream().flatMap(p -> p.second)
);
}
Это еще не поглощает ни одного из исходных потоков, сохраняя при этом простое решение, которое позволяет избежать вложенных конкатенаций потоков.
Ответ 3
Прежде всего, это будет "более функциональная" версия вашего кода, и вы скажете, что предпочитаете стилистически:
<A, B> Pair<Stream<A>, Stream<B>> flattenFunctional(Iterator<Pair<Stream<A>, Stream<B>>> iterator) {
return Streams.stream(iterator)
.reduce(Pair.of(Stream.empty(), Stream.empty()),
(a, b) -> Pair.of(
Stream.concat(a.first, b.first),
Stream.concat(a.second, b.second)));
}
Предупреждение о возможном StackOverflowError
по-прежнему применяется здесь, когда используется Stream.concat
.
Чтобы избежать этого, а также думать о производительности и использовании памяти для больших наборов данных, у меня есть следующее предложение (не работает вообще). Вы можете создать пару настраиваемых Iterator
(для типов A
, B
) и использовать Guava Streams.stream()
для получения пары потоков. Поместите эти пользовательские итераторы в класс с парой стеков итераторов. Если, например, в первой паре в Iterator
, Stream<A>
имеет меньше элементов, чем Stream<B>
, то после того, как Stream<A>
исчерпан, вызовите iterator.next()
и нажмите итератор B
в свой стек. Вот класс с парой стеков (добавьте конструктор):
class PairStreamIterator<A, B> {
private final Iterator<Pair<Stream<A>, Stream<B>>> iterator;
private final Queue<Iterator<A>> stackA = new ArrayDeque<>();
private final Queue<Iterator<B>> stackB = new ArrayDeque<>();
Iterator<A> getItA() {
return new Iterator<A>() {
@Override public boolean hasNext() {
if (!stackA.isEmpty() && !stackA.peek().hasNext()) {
stackA.remove();
return hasNext();
} else if (!stackA.isEmpty() && stackA.peek().hasNext()) {
return true;
} else if (iterator.hasNext()) {
Pair<Stream<A>, Stream<B>> pair = iterator.next();
stackA.add(pair.first.iterator());
stackB.add(pair.second.iterator());
return hasNext();
}
return false;
}
@Override public A next() {
return stackA.peek().next();
}
};
}
// repeat for Iterator<B>
}
и flatten
:
<A, B> Pair<Stream<A>, Stream<B>> flattenIt(Iterator<Pair<Stream<A>, Stream<B>>> iterator) {
final PairStreamIterator<A, B> pair = new PairStreamIterator<>(iterator);
return Pair.of(Streams.stream(pair.getItA()), Streams.stream(pair.getItB()));
}
2 стека обычно содержат 1 или 2 итератора, если вы потребляете 2 потока в паре результатов flatten
с той же скоростью. Худший сценарий - если вы планируете полностью поглотить один из потоков получающейся пары, а затем другой. В этом случае все итераторы, необходимые для второго сплющенного потока, останутся в стеке итераторов. Я не думаю, что я могу испугаться. Поскольку они хранятся в куче в памяти, вы не получите StackOverflowError
, хотя вы все равно можете получить OutOfMemoryError
Возможным предостережением является использование рекурсии в hasNext
. Это будет проблемой только в том случае, если вы встретите много последовательных пустых потоков на вашем входе.