Java flatmap Iterator <Pair <Stream <A>, Stream <B>>> to Pair <Stream <A>, Stream <B>>

Я пытаюсь реализовать метод со следующей сигнатурой:

public static <A,B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>>> iterator);

Если целью метода является сглаживание каждого из типов потоков в один поток и завершение вывода в пару. У меня есть только Iterator (не Iterable), и я не могу изменить подпись метода, поэтому мне нужно выполнить сглаживание в одной итерации.

Моя самая лучшая реализация -

public static <A,B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>> iterator) {
    Stream<A> aStream = Stream.empty();
    Stream<B> bStream = Stream.empty();
    while(iterator.hasNext()) {
        Pair<Stream<A>, Stream<B>> elm = iterator.next();
        aStream = Stream.concat(aStream, elm.first);
        bStream = Stream.concat(bStream, elm.second);
    }
    return Pair.of(aStream, bStream);
}

Но пока это технически правильно, я не очень доволен этим по двум причинам:

Кажется, что Stream # flatMap должен быть здесь подходящим (после преобразования входного Iterator в Stream с использованием Guava Streams.stream(Iterator), но, похоже, он не работает из-за типа пары в середине.

Еще одно требование состоит в том, что любой из итераторов/потоков может быть очень большим (вход может содержать где угодно от одной пары чрезвычайно больших потоков ко многим потокам одного элемента, например), поэтому решения в идеале не должны содержать сбор результаты в коллекции в памяти.

Ответы

Ответ 1

Ну, гуава Streams.stream - это не волшебство, а это действительно внутренне:

StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false);

Поэтому, возможно, нет необходимости связывать это с вашим методом, пока вы можете использовать его напрямую.

И вы можете использовать Stream.Builder только для этого:

public static <A, B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>>> iterator) {

    Stream.Builder<Stream<A>> builderA = Stream.builder();
    Stream.Builder<Stream<B>> builderB = Stream.builder();

    iterator.forEachRemaining(pair -> {
        builderA.add(pair.first);
        builderB.add(pair.second);
    });

    return Pair.of(builderA.build().flatMap(Function.identity()), builderB.build().flatMap(Function.identity()));
}

Ответ 2

Избегать сбора всего Iterator (как вы на самом деле делаете в вопросе) довольно сложно, так как вы не знаете, как будут обрабатываться результирующие потоки: можно полностью уничтожить, требуя, чтобы итератор полностью потреблял ну, в то время как другой не потребляется вообще, требуя отслеживать все произведенные пары - эффективно собирать их где-то.

Только если потоки будут потребляться более или менее на "скорости", вы можете извлечь выгоду из того, что не собираете весь итератор. Но такое потребление подразумевает либо работу с итератором одного из результирующих потоков, либо потребление потоков в параллельных потоках, что потребует дополнительной синхронизации.

Поэтому я предлагаю собрать все пары в List вместо этого, а затем сгенерировать новый Pair из этого списка:

public static <A,B> Pair<Stream<A>, Stream<B>> flatten(Iterator<Pair<Stream<A>, Stream<B>>> iterator) {
    Iterable<Pair<Stream<A>, Stream<B>>> iterable = () -> iterator;
    final List<Pair<Stream<A>, Stream<B>>> allPairs =
        StreamSupport.stream(iterable.spliterator(), false)
            .collect(Collectors.toList());

    return Pair.of(
            allPairs.stream().flatMap(p -> p.first),
            allPairs.stream().flatMap(p -> p.second)
    );
}

Это еще не поглощает ни одного из исходных потоков, сохраняя при этом простое решение, которое позволяет избежать вложенных конкатенаций потоков.

Ответ 3

Прежде всего, это будет "более функциональная" версия вашего кода, и вы скажете, что предпочитаете стилистически:

<A, B> Pair<Stream<A>, Stream<B>> flattenFunctional(Iterator<Pair<Stream<A>, Stream<B>>> iterator) {
    return Streams.stream(iterator)
        .reduce(Pair.of(Stream.empty(), Stream.empty()),
            (a, b) -> Pair.of(
                Stream.concat(a.first, b.first),
                Stream.concat(a.second, b.second)));
}

Предупреждение о возможном StackOverflowError по-прежнему применяется здесь, когда используется Stream.concat.

Чтобы избежать этого, а также думать о производительности и использовании памяти для больших наборов данных, у меня есть следующее предложение (не работает вообще). Вы можете создать пару настраиваемых Iterator (для типов A, B) и использовать Guava Streams.stream() для получения пары потоков. Поместите эти пользовательские итераторы в класс с парой стеков итераторов. Если, например, в первой паре в Iterator, Stream<A> имеет меньше элементов, чем Stream<B>, то после того, как Stream<A> исчерпан, вызовите iterator.next() и нажмите итератор B в свой стек. Вот класс с парой стеков (добавьте конструктор):

class PairStreamIterator<A, B> {
    private final Iterator<Pair<Stream<A>, Stream<B>>> iterator;
    private final Queue<Iterator<A>> stackA = new ArrayDeque<>();
    private final Queue<Iterator<B>> stackB = new ArrayDeque<>();

    Iterator<A> getItA() {
        return new Iterator<A>() {
            @Override public boolean hasNext() {
                if (!stackA.isEmpty() && !stackA.peek().hasNext()) {
                    stackA.remove();
                    return hasNext();
                } else if (!stackA.isEmpty() && stackA.peek().hasNext()) {
                    return true;
                } else if (iterator.hasNext()) {
                    Pair<Stream<A>, Stream<B>> pair = iterator.next();
                    stackA.add(pair.first.iterator());
                    stackB.add(pair.second.iterator());
                    return hasNext();
                }
                return false;
            }

            @Override public A next() {
                return stackA.peek().next();
            }
        };
    }    
    // repeat for Iterator<B>
}

и flatten:

<A, B> Pair<Stream<A>, Stream<B>> flattenIt(Iterator<Pair<Stream<A>, Stream<B>>> iterator) {
    final PairStreamIterator<A, B> pair = new PairStreamIterator<>(iterator);
    return Pair.of(Streams.stream(pair.getItA()), Streams.stream(pair.getItB()));
}

2 стека обычно содержат 1 или 2 итератора, если вы потребляете 2 потока в паре результатов flatten с той же скоростью. Худший сценарий - если вы планируете полностью поглотить один из потоков получающейся пары, а затем другой. В этом случае все итераторы, необходимые для второго сплющенного потока, останутся в стеке итераторов. Я не думаю, что я могу испугаться. Поскольку они хранятся в куче в памяти, вы не получите StackOverflowError, хотя вы все равно можете получить OutOfMemoryError

Возможным предостережением является использование рекурсии в hasNext. Это будет проблемой только в том случае, если вы встретите много последовательных пустых потоков на вашем входе.