Java 8: операция прекращения остановки с изучения всех элементов Stream

Я пытаюсь понять, есть ли способ прекратить операцию сокращения без изучения всего потока, и я не могу понять способ.

Пример использования выглядит примерно так: пусть будет длинный список Integer, который нужно сложить в Accumulator. Каждый элемент экзамена потенциально дорог, поэтому в Accumulator я выполняю проверку входящего Accumulator, чтобы увидеть, нужно ли нам даже выполнять дорогостоящую операцию - если мы этого не сделаем, я просто возвращаю аккумулятор.

Это, по-видимому, прекрасное решение для небольших (er) списков, но огромные списки несут излишние затраты на поток, которые я бы хотел избежать.

Здесь эскиз кода - предполагайте только последовательные сокращения.

class Accumulator {
    private final Set<A> setA = new HashSet<>;
    private final Set<B> setB = new HashSet<>;
}

class ResultSupplier implements Supplier<Result> {

    private final List<Integer> ids;

    @Override
    public Result get() {
        Accumulator acc = ids.stream().reduce(new Accumulator(), f(), (x, y) -> null);

        return (acc.setA.size > 1) ? Result.invalid() : Result.valid(acc.setB);
    }

    private static BiFunction<Accumulator, Integer, Accumulator> f() {
        return (acc, element) -> {
            if (acc.setA.size() <= 1) {
                // perform expensive ops and accumulate results
            }
            return acc;
        };
    }
}

В дополнение к тому, чтобы пройти весь Stream, есть еще один факт, который мне не нравится: мне нужно дважды проверить одно и то же условие (а именно, setA проверка размера).

Я рассмотрел операции map() и collect(), но они просто казались более похожими и не обнаружили, что они существенно меняют тот факт, что я просто не могу закончить операцию сбрасывания без изучения всего потока.

Кроме того, я думаю, что мнимый takeWhile(p : (A) => boolean) корреспондент Stream API также не будет нам ничего не покупать, поскольку условие завершения зависит от аккумулятора, а не от потоков элементов как таковых.

Имейте в виду, что я относительный новичок в FP - есть ли способ сделать эту работу так, как я ожидаю? Я создал всю проблему неправильно или это ограничение по дизайну?

Ответы

Ответ 1

Вместо того, чтобы начинать с ids.stream(), вы можете

  • использовать ids.spliterator()
  • завершает преобразование получателя в пользовательский разделитель, который имеет летучий логический флаг
  • имеет пользовательский разделитель tryAdvance возвращает false, если флаг изменен.
  • превратите свой собственный разделитель в поток с помощью StreamSupport.stream(Spliterator<T>, boolean)
  • продолжайте свой потоковый конвейер как раньше
  • выключить поток, переключив логическое значение, когда ваш аккумулятор заполнен.

Добавьте некоторые статические вспомогательные методы, чтобы поддерживать его работоспособность.

получившийся API может выглядеть примерно так.

Accumulator acc = terminateableStream(ids, (stream, terminator) ->
   stream.reduce(new Accumulator(terminator), f(), (x, y) -> null));

Кроме того, я думаю, что мнимая takeWhile (p: (A) = > boolean) Корреспондент Stream API также не купит нам ничего

Он работает, если условие зависит от состояния аккумулятора, а не от членов потока. Это, по сути, подход, описанный выше.

Вероятно, это было бы запрещено в takeWhile, предоставляемом JDK, но специальная реализация с использованием spliterators может принимать подход с учетом состояния.

Ответ 2

Конечно, будет интересный, чисто FP-ответ, который может помочь решить эту проблему так, как вы планируете.

Тем временем, зачем использовать FP вообще, когда простое решение является прагматически обязательным, и исходный источник данных в любом случае является List, который уже полностью материализован, и вы будете использовать последовательное сокращение, а не параллельное сокращение. Вместо этого напишите:

@Override
public Result get() {
    Accumulator acc = new Accumulator();

    for (Integer id : ids) {
        if (acc.setA.size() <= 1) {
            // perform expensive ops and accumulate results
        }

        // Easy:
        if (enough)
            break;
    }

    return (acc.setA.size > 1) ? Result.invalid() : Result.valid(acc.setB);
}

Ответ 3

Как упоминалось в комментариях: сценарий использования звучит немного сомнительно. С одной стороны, из-за использования reduce вместо collect, с другой стороны, из-за того, что условие, которое должно использоваться для остановки восстановления, также появляется в аккумуляторе. Это похоже на простое ограничение потока на определенное количество элементов или на основе условия, как показано в другом вопросе, может быть здесь более уместным.

Конечно, в реальном приложении может быть, что условие фактически не связано с количеством обработанных элементов. В этом случае я набросал здесь решение, которое в основном соответствует ответу by8472 и очень похоже на решение из упомянутого выше вопроса: он использует Stream, который создается из Spliterator, который просто делегирует исходный Spliterator, если не выполняется условие остановки.

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class StopStreamReduction
{
    public static void main(String[] args)
    {
        ResultSupplier r = new ResultSupplier();
        System.out.println(r.get());
    }
}

class Accumulator
{
    final Set<Integer> set = new HashSet<Integer>();
}

class ResultSupplier implements Supplier<String>
{
    private final List<Integer> ids;
    ResultSupplier()
    {
        ids = new ArrayList<Integer>(Collections.nCopies(20, 1));
    }

    public String get()
    {
        //return getOriginal();
        return getStopping();
    }

    private String getOriginal()
    {
        Accumulator acc =
            ids.stream().reduce(new Accumulator(), f(), (x, y) -> null);
        return (acc.set.size() > 11) ? "invalid" : String.valueOf(acc.set);
    }

    private String getStopping()
    {
        Spliterator<Integer> originalSpliterator = ids.spliterator();
        Accumulator accumulator = new Accumulator();
        Spliterator<Integer> stoppingSpliterator = 
            new Spliterators.AbstractSpliterator<Integer>(
                originalSpliterator.estimateSize(), 0)
            {
                @Override
                public boolean tryAdvance(Consumer<? super Integer> action)
                {
                    return accumulator.set.size() > 10 ? false : 
                        originalSpliterator.tryAdvance(action);
                }
            };
        Stream<Integer> stream = 
            StreamSupport.stream(stoppingSpliterator, false);
        Accumulator acc =
            stream.reduce(accumulator, f(), (x, y) -> null);
        return (acc.set.size() > 11) ? "invalid" : String.valueOf(acc.set);
    }

    private static int counter = 0;
    private static BiFunction<Accumulator, Integer, Accumulator> f()
    {
        return (acc, element) -> {

            System.out.print("Step " + counter);
            if (acc.set.size() <= 10)
            {
                System.out.print(" expensive");
                acc.set.add(counter);
            }
            System.out.println();
            counter++;
            return acc;
        };
    }
}

Изменить в ответ на комментарии:

Конечно, его можно написать "более функциональным". Однако из-за неопределенных описаний в вопросах и довольно "отрывочного" примера кода трудно найти "самое" подходящее решение здесь. (И "подходящий" относится к конкретным предостережениям фактической задачи и к вопросу насколько функционально это должно быть, не жертвуя удобочитаемостью).

Возможные шаги по функционализации могут включать создание общего класса StoppingSpliterator, который работает с делегатом Spliterator и имеет Supplier<Boolean> в качестве условия его остановки, и подает его с помощью Predicate в фактическом Accumulator, вместе с использованием некоторых методов утилиты и ссылок на методы здесь и там.

Но опять-таки: Это спорно, является ли это на самом деле соответствующее решение, или не следует, а использовать простое и практичное решение от ответ Лукаса Эдер...

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;

public class StopStreamReduction
{
    public static void main(String[] args)
    {
        List<Integer> collection = 
            new ArrayList<Integer>(Collections.nCopies(20, 1));
        System.out.println(compute(collection));
    }

    private static String compute(List<Integer> collection)
    {
        Predicate<Accumulator> stopCondition = (a) -> a.set.size() > 10;
        Accumulator result = reduceStopping(collection, 
            new Accumulator(), StopStreamReduction::accumulate, stopCondition);
        return (result.set.size() > 11) ? "invalid" : String.valueOf(result.set);
    }

    private static int counter;
    private static Accumulator accumulate(Accumulator a, Integer element)
    {
        System.out.print("Step " + counter);
        if (a.set.size() <= 10)
        {
            System.out.print(" expensive");
            a.set.add(counter);
        }
        System.out.println();
        counter++;
        return a;
    }

    static <U, T> U reduceStopping(
        Collection<T> collection, U identity,
        BiFunction<U, ? super T, U> accumulator,
        Predicate<U> stopCondition)
    {
       // This assumes that the accumulator always returns
       // the identity instance (with the accumulated values).
       // This may not always be true!
       return StreamSupport.stream(
           new StoppingSpliterator<T>(
               collection.spliterator(), 
               () -> stopCondition.test(identity)), false).
                   reduce(identity, accumulator, (x, y) -> null);
    }
}

class Accumulator
{
    final Set<Integer> set = new HashSet<Integer>();
}

class StoppingSpliterator<T> extends Spliterators.AbstractSpliterator<T>
{
    private final Spliterator<T> delegate;
    private final Supplier<Boolean> stopCondition;

    StoppingSpliterator(Spliterator<T> delegate, Supplier<Boolean> stopCondition)
    {
        super(delegate.estimateSize(), 0);
        this.delegate = delegate;
        this.stopCondition = stopCondition;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
        if (stopCondition.get())
        {
            return false;
        }
        return delegate.tryAdvance(action);
    }
}

Ответ 4

Реального решения FP нет, просто потому, что весь ваш аккумулятор не является FP. Мы не можем помочь вам в этом отношении, поскольку мы не знаем, что он на самом деле делает. Все, что мы видим, это то, что он опирается на две изменяемые коллекции и, следовательно, не может быть частью чистого решения FP.

Если вы согласны с ограничениями и что нет чистого способа использования API Stream, вы можете стремиться к простому способу. Простой способ включает в себя stateful Predicate, который не лучший, но иногда неизбежный:

public Result get() {
    int limit = 1;
    Set<A> setA=new HashSet<>();
    Set<B> setB=new HashSet<>();
    return ids.stream().anyMatch(i -> {
        // perform expensive ops and accumulate results
        return setA.size() > limit;
    })? Result.invalid(): Result.valid(setB);
}

Но я хочу отметить, что с учетом вашей конкретной логики, т.е. ваш результат считается недопустимым, когда набор становится слишком большим, ваша попытка обработки не слишком большого количества элементов - это оптимизация ошибочного случая. Вы не должны тратить усилия на оптимизацию этого. Если действительный результат является результатом обработки всех элементов, тогда обработайте все элементы...

Ответ 5

Я согласен со всеми предыдущими ответами. Вы делаете это неправильно, вызывая сокращение на изменяемом аккумуляторе. Кроме того, процесс, который вы описываете, не может быть выражен как конвейер преобразований и сокращений.

Если вам действительно нужно действительно сделать это в стиле FP, я бы сделал, как указывает @848472.

В любом случае, я даю вам новую более компактную альтернативу, подобную решению @lukas-eder, используя Iterator:

Function<Integer, Integer> costlyComputation = Function.identity();

Accumulator acc = new Accumulator();

Iterator<Integer> ids = Arrays.asList(1, 2, 3).iterator();

while (!acc.hasEnough() && ids.hasNext())
  costlyComputation.andThen(acc::add).apply(ids.next());

У вас есть две разные проблемы относительно FP:

Как остановить повторение

Поскольку вы зависите от изменяемого состояния, FP только сделает вашу жизнь труднее. Вы можете итерировать извне коллекцию или использовать Iterator, как я предлагаю.

Затем используйте if() для остановки итерации.

Вы можете подумать о разных стратегиях, но в конце дня это то, что вы используете.

Я предпочитаю итератор, потому что больше идиоматичен (в этом случае лучше выражает ваше намерение).

Как создать Аккумулятор и дорогостоящую операцию

Это самое интересное для меня.

Чистая функция не может иметь состояние, должна получать что-то и должна что-то возвращать и всегда одно и то же для одного и того же ввода (например, математической функции). Можете ли вы выразить свою дорогостоящую операцию?

Нужно ли какое-то общее состояние с Аккумулятором? Может быть, это разделение не принадлежит ни одному из них.

Преобразуете ли вы свой вход, а затем добавите его в Аккумулятор или это ответственность Аккумулятора? Имеет ли смысл вводить функцию в Аккумулятор?

Ответ 6

Я думаю, возможно выбросить RuntimeException специального типа из вашего пользовательского коллектора (или уменьшить операцию), который включает результат внутри объекта исключения и уловить его за пределами операции collect, разворачивая результат. Я знаю, что использование исключения для не исключительного потока управления не является идиоматическим, но оно должно работать в вашем случае даже для параллельных потоков.

На самом деле существует много случаев, когда сокращение короткого замыкания может быть полезным. Например, собирайте значения перечисления до EnumSet (вы можете остановить, как только обнаружите, что все возможные значения перечисления уже собраны). Или пересечь все элементы Stream<Set> (вы можете остановиться, если ваш результирующий набор станет пустым после некоторого шага: продолжение сокращения бесполезно). Внутри там используется флаг SHORT_CIRCUIT, используемый в потоковых операциях, таких как findFirst, но он не открыт для публичного API.