Ограничить поток предикатом

Существует ли операция потока Java 8, которая ограничивает (потенциально бесконечный) Stream до тех пор, пока первый элемент не будет соответствовать предикату?

В Java 9 мы можем использовать takeWhile как в примере ниже, чтобы напечатать все числа меньше 10.

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);

Поскольку в Java 8 нет такой операции, каков наилучший способ ее реализации в общем виде?

Ответы

Ответ 1

Такая операция должна быть возможна с помощью Java 8 Stream, но это не обязательно может быть сделано эффективно - например, вы не можете распараллеливать такую ​​операцию, поскольку вы должны смотреть на элементы в порядок.

API не обеспечивает простой способ сделать это, но, пожалуй, самый простой способ - взять Stream.iterator(), обернуть Iterator, чтобы иметь реализацию "take-while", а затем вернуться к Spliterator, а затем a Stream. Или, может быть, оберните Spliterator, хотя в этой реализации он больше не может быть разделен.

Здесь непроверенная реализация takeWhile на Spliterator:

static <T> Spliterator<T> takeWhile(
    Spliterator<T> splitr, Predicate<? super T> predicate) {
  return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) {
    boolean stillGoing = true;
    @Override public boolean tryAdvance(Consumer<? super T> consumer) {
      if (stillGoing) {
        boolean hadNext = splitr.tryAdvance(elem -> {
          if (predicate.test(elem)) {
            consumer.accept(elem);
          } else {
            stillGoing = false;
          }
        });
        return hadNext && stillGoing;
      }
      return false;
    }
  };
}

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) {
   return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false);
}

Ответ 2

Операции takeWhile и dropWhile были добавлены в JDK 9. Ваш примерный код

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);

будет вести себя точно так, как вы ожидаете, при компиляции и запуске под JDK 9.

Вышел JDK 9. Его можно скачать здесь: http://jdk.java.net/9/

Ответ 3

allMatch() является функцией короткого замыкания, поэтому вы можете использовать ее для прекращения обработки. Главный недостаток заключается в том, что вам нужно дважды выполнить свой тест: один раз посмотреть, следует ли его обрабатывать, и снова посмотреть, продолжать ли идти.

IntStream
    .iterate(1, n -> n + 1)
    .peek(n->{if (n<10) System.out.println(n);})
    .allMatch(n->n < 10);

Ответ 4

В качестве продолжения ответа @StuartMarks. В моей библиотеке StreamEx есть операция takeWhile, которая совместим с текущей реализацией JDK-9. При работе под JDK-9 он просто делегирует реализацию JDK (через MethodHandle.invokeExact, которая очень быстро). При работе под JDK-8 будет использоваться реализация "polyfill". Поэтому, используя мою библиотеку, проблема может быть решена следующим образом:

IntStreamEx.iterate(1, n -> n + 1)
           .takeWhile(n -> n < 10)
           .forEach(System.out::println);

Ответ 5

takeWhile является одной из функций, предоставляемых библиотекой протонпаков .

Stream<Integer> infiniteInts = Stream.iterate(0, i -> i + 1);
Stream<Integer> finiteInts = StreamUtils.takeWhile(infiniteInts, i -> i < 10);

assertThat(finiteInts.collect(Collectors.toList()),
           hasSize(10));

Ответ 6

Вы можете использовать java8 + rxjava.

import java.util.stream.IntStream;
import rx.Observable;


// Example 1)
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n ->
          {
                System.out.println(n);
                return n < 10;
          }
    ).subscribe() ;


// Example 2
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n -> n < 10)
    .forEach( n -> System.out.println(n));

Ответ 7

Обновление: Java 9 Stream теперь поставляется с методом takeWhile.

Нет необходимости для взлома или других решений. Просто используйте это!


Я уверен, что это может быть значительно улучшено: (кто-то может сделать это потокобезопасным, может быть)

Stream<Integer> stream = Stream.iterate(0, n -> n + 1);

TakeWhile.stream(stream, n -> n < 10000)
         .forEach(n -> System.out.print((n == 0 ? "" + n : "," + n)));

Взломать наверняка... Не элегантно - но это работает ~: D

class TakeWhile<T> implements Iterator<T> {

    private final Iterator<T> iterator;
    private final Predicate<T> predicate;
    private volatile T next;
    private volatile boolean keepGoing = true;

    public TakeWhile(Stream<T> s, Predicate<T> p) {
        this.iterator = s.iterator();
        this.predicate = p;
    }

    @Override
    public boolean hasNext() {
        if (!keepGoing) {
            return false;
        }
        if (next != null) {
            return true;
        }
        if (iterator.hasNext()) {
            next = iterator.next();
            keepGoing = predicate.test(next);
            if (!keepGoing) {
                next = null;
            }
        }
        return next != null;
    }

    @Override
    public T next() {
        if (next == null) {
            if (!hasNext()) {
                throw new NoSuchElementException("Sorry. Nothing for you.");
            }
        }
        T temp = next;
        next = null;
        return temp;
    }

    public static <T> Stream<T> stream(Stream<T> s, Predicate<T> p) {
        TakeWhile tw = new TakeWhile(s, p);
        Spliterator split = Spliterators.spliterator(tw, Integer.MAX_VALUE, Spliterator.ORDERED);
        return StreamSupport.stream(split, false);
    }

}

Ответ 8

На самом деле есть два способа сделать это в Java 8 без каких-либо дополнительных библиотек или с использованием Java 9.

Если вы хотите печатать цифры от 2 до 20 на консоли, вы можете сделать это:

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).allMatch(i -> i < 20);

или

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).anyMatch(i -> i >= 20);

Выход в обоих случаях:

2
4
6
8
10
12
14
16
18
20

Никто еще не упомянул anyMatch. Вот почему этот пост.

Ответ 9

Это источник, скопированный из JDK 9 java.util.stream.Stream.takeWhile(Predicate). Небольшая разница для работы с JDK 8.

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> p) {
    class Taking extends Spliterators.AbstractSpliterator<T> implements Consumer<T> {
        private static final int CANCEL_CHECK_COUNT = 63;
        private final Spliterator<T> s;
        private int count;
        private T t;
        private final AtomicBoolean cancel = new AtomicBoolean();
        private boolean takeOrDrop = true;

        Taking(Spliterator<T> s) {
            super(s.estimateSize(), s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED));
            this.s = s;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            boolean test = true;
            if (takeOrDrop &&               // If can take
                    (count != 0 || !cancel.get()) && // and if not cancelled
                    s.tryAdvance(this) &&   // and if advanced one element
                    (test = p.test(t))) {   // and test on element passes
                action.accept(t);           // then accept element
                return true;
            } else {
                // Taking is finished
                takeOrDrop = false;
                // Cancel all further traversal and splitting operations
                // only if test of element failed (short-circuited)
                if (!test)
                    cancel.set(true);
                return false;
            }
        }

        @Override
        public Comparator<? super T> getComparator() {
            return s.getComparator();
        }

        @Override
        public void accept(T t) {
            count = (count + 1) & CANCEL_CHECK_COUNT;
            this.t = t;
        }

        @Override
        public Spliterator<T> trySplit() {
            return null;
        }
    }
    return StreamSupport.stream(new Taking(stream.spliterator()), stream.isParallel()).onClose(stream::close);
}

Ответ 10

Вот версия, сделанная в ints - как задано в вопросе.

Использование:

StreamUtil.takeWhile(IntStream.iterate(1, n -> n + 1), n -> n < 10);

Здесь код для StreamUtil:

import java.util.PrimitiveIterator;
import java.util.Spliterators;
import java.util.function.IntConsumer;
import java.util.function.IntPredicate;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

public class StreamUtil
{
    public static IntStream takeWhile(IntStream stream, IntPredicate predicate)
    {
        return StreamSupport.intStream(new PredicateIntSpliterator(stream, predicate), false);
    }

    private static class PredicateIntSpliterator extends Spliterators.AbstractIntSpliterator
    {
        private final PrimitiveIterator.OfInt iterator;
        private final IntPredicate predicate;

        public PredicateIntSpliterator(IntStream stream, IntPredicate predicate)
        {
            super(Long.MAX_VALUE, IMMUTABLE);
            this.iterator = stream.iterator();
            this.predicate = predicate;
        }

        @Override
        public boolean tryAdvance(IntConsumer action)
        {
            if (iterator.hasNext()) {
                int value = iterator.nextInt();
                if (predicate.test(value)) {
                    action.accept(value);
                    return true;
                }
            }

            return false;
        }
    }
}

Ответ 11

Пойдите, чтобы получить библиотеку AbacusUtil. Он предоставляет точный API, который вы хотите, и многое другое:

IntStream.iterate(1, n -> n + 1).takeWhile(n -> n < 10).forEach(System.out::println);

Декларация: я разработчик AbacusUtil.

Ответ 12

Если вы знаете точное количество повторений, которые будут выполнены, вы можете сделать

IntStream
          .iterate(1, n -> n + 1)
          .limit(10)
          .forEach(System.out::println);

Ответ 13

Вы не можете прервать поток, за исключением операции с коротким замыканием, которая оставила бы некоторые значения потока необработанными независимо от их значения. Но если вы просто хотите избежать операций над потоком, вы можете добавить преобразование и фильтр в поток:

import java.util.Objects;

class ThingProcessor
{
    static Thing returnNullOnCondition(Thing thing)
    {    return( (*** is condition met ***)? null : thing);    }

    void processThings(Collection<Thing> thingsCollection)
    {
        thingsCollection.stream()
        *** regular stream processing ***
        .map(ThingProcessor::returnNullOnCondition)
        .filter(Objects::nonNull)
        *** continue stream processing ***
    }
} // class ThingProcessor

Это преобразует поток вещей в нуль, когда вещи удовлетворяют некоторому условию, а затем отфильтровывает нули. Если вы готовы побаловать побочные эффекты, вы можете установить значение условия true, как только произойдет что-то, поэтому все последующие вещи отфильтровываются независимо от их значения. Но даже если вы не можете сэкономить много (если не совсем) обработку, отфильтровывая значения из потока, который вы не хотите обрабатывать.

Ответ 14

Даже у меня было похожее требование - запусти веб-сервис, если не получится, повторите его 3 раза. Если это не удается даже после этих многочисленных испытаний, отправьте уведомление по электронной почте. После многих anyMatch() стал спасителем. Мой пример кода выглядит следующим образом. В следующем примере, если метод webServiceCall возвращает true в самой первой итерации, поток не выполняет дальнейшую итерацию, как мы вызвали anyMatch(). Я считаю, это то, что вы ищете.

import java.util.stream.IntStream;

import io.netty.util.internal.ThreadLocalRandom;

class TrialStreamMatch {

public static void main(String[] args) {        
    if(!IntStream.range(1,3).anyMatch(integ -> webServiceCall(integ))){
         //Code for sending email notifications
    }
}

public static boolean webServiceCall(int i){
    //For time being, I have written a code for generating boolean randomly
    //This whole piece needs to be replaced by actual web-service client code
    boolean bool = ThreadLocalRandom.current().nextBoolean();
    System.out.println("Iteration index :: "+i+" bool :: "+bool);

    //Return success status -- true or false
    return bool;
}

Ответ 15

    IntStream.iterate(1, n -> n + 1)
    .peek(System.out::println) //it will be executed 9 times
    .filter(n->n>=9)
    .findAny();

вместо пика вы можете использовать mapToObj для возврата конечного объекта или сообщения

    IntStream.iterate(1, n -> n + 1)
    .mapToObj(n->{   //it will be executed 9 times
            if(n<9)
                return "";
            return "Loop repeats " + n + " times";});
    .filter(message->!message.isEmpty())
    .findAny()
    .ifPresent(System.out::println);

Ответ 16

Если у вас другая проблема, вам может понадобиться другое решение, но для вашей текущей проблемы я просто зайду:

IntStream
    .iterate(1, n -> n + 1)
    .limit(10)
    .forEach(System.out::println);

Ответ 17

У меня есть еще одно быстрое решение, реализуя это (что на самом деле нечисто, но вы понимаете):

public static void main(String[] args) {
    System.out.println(StreamUtil.iterate(1, o -> o + 1).terminateOn(15)
            .map(o -> o.toString()).collect(Collectors.joining(", ")));
}

static interface TerminatedStream<T> {
    Stream<T> terminateOn(T e);
}

static class StreamUtil {
    static <T> TerminatedStream<T> iterate(T seed, UnaryOperator<T> op) {
        return new TerminatedStream<T>() {
            public Stream<T> terminateOn(T e) {
                Builder<T> builder = Stream.<T> builder().add(seed);
                T current = seed;
                while (!current.equals(e)) {
                    current = op.apply(current);
                    builder.add(current);
                }
                return builder.build();
            }
        };
    }
}

Ответ 18

Вот моя попытка использовать только библиотеку Java Stream.

        IntStream.iterate(0, i -> i + 1)
        .filter(n -> {
                if (n < 10) {
                    System.out.println(n);
                    return false;
                } else {
                    return true;
                }
            })
        .findAny();

Ответ 19

Может быть, немного не по теме, но это то, что мы имеем для List<T> а не для Stream<T>.

Для начала нужно использовать метод take util. Этот метод занимает первые n элементов:

static <T> List<T> take(List<T> l, int n) {
    if (n <= 0) {
        return newArrayList();
    } else {
        int takeTo = Math.min(Math.max(n, 0), l.size());
        return l.subList(0, takeTo);
    }
}

это просто работает как scala.List.take

    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3, 4, 5), 3));
    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3), 5));

    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), -1));
    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), 0));

теперь будет довольно просто написать метод takeWhile основанный на take

static <T> List<T> takeWhile(List<T> l, Predicate<T> p) {
    return l.stream().
            filter(p.negate()).findFirst(). // find first element when p is false
            map(l::indexOf).        // find the index of that element
            map(i -> take(l, i)).   // take up to the index
            orElse(l);  // return full list if p is true for all elements
}

это работает так:

    assertEquals(newArrayList(1, 2, 3), takeWhile(newArrayList(1, 2, 3, 4, 3, 2, 1), i -> i < 4));

эта реализация частично повторяет список несколько раз, но не добавляет операций добавления O(n^2). Надеюсь, что это приемлемо.