Java 8 Streams: подсчет всех элементов, которые входят в операцию терминала
Интересно, есть ли более хороший (или просто другой) подход, чтобы получить количество всех элементов, которые вводят операцию терминала в поток, а не следующее:
Stream<T> stream = ... // given as parameter
AtomicLong count = new AtomicLong();
stream.filter(...).map(...)
.peek(t -> count.incrementAndGet())
где count.get()
дает мне фактическое количество обработанных элементов на этом этапе.
Я намеренно пропустил операцию терминала, поскольку это может измениться между .forEach
, .reduce
или .collect
.
Я уже знаю .count
, но, похоже, он работает хорошо, только если я обмениваю .forEach
на .map
и использую .count
как терминальная операция. Но мне кажется, что .map
используется неправильно.
Что мне не очень нравится в вышеупомянутом решении: если после него добавляется фильтр, он просто подсчитывает элементы на этом конкретном этапе, но не те, которые входят в операцию терминала.
Другой подход, который приходит мне на ум, - это collect
отфильтрованные и отображенные значения в список, и работать с ним и просто вызвать list.size()
, чтобы получить счет. Однако это не сработает, если сбор потока приведет к ошибке, тогда как с вышеупомянутым решением я мог бы иметь счет для всех обработанных элементов до сих пор, если соответствующий try/catch
находится на месте. Это, однако, не является жестким требованием.
Ответы
Ответ 1
Кажется, у вас уже есть самое чистое решение через peek
перед операцией терминала IMO. Единственная причина, по которой я могу думать, что это необходимо, - это отладка целей, и если это так, то для этого был разработан peek
. Обтекание Stream для этого и предоставление отдельных реализаций слишком много - помимо огромного количества времени и более поздней поддержки для всего, что добавляется в Streams
.
Для части того, что, если добавлен еще один фильтр? Ну, дайте комментарий к коду (многие из нас это делают) и несколько тестовых примеров, которые в противном случае могли бы потерпеть неудачу, например.
Только мои 0,02 $
Ответ 2
Лучшая идея, которая возможна, - это использовать отображение на себе и при этом подсчитывать вызов программы отображения.
steam.map(object -> {counter.incrementAndGet(); return object;});
Так как эта лямбда может быть использована повторно, и вы можете заменить любую лямбду на объект, вы можете создать объект-счетчик следующим образом:
class StreamCounter<T> implements Function<? super T,? extends T> {
int counter = 0;
public T apply(T object) { counter++; return object;}
public int get() { return counter;}
}
Итак, используя:
StreamCounter<String> myCounter = new ...;
stream.map(myCounter)...
int count = myCounter.get();
Так как снова вызов карты - это еще одна точка повторного использования, метод карты может быть обеспечен путем расширения потока и переноса обычного потока.
Таким образом вы можете создать что-то вроде:
AtomicLong myValue = new AtomicLong();
...
convert(stream).measure(myValue).map(...).measure(mySecondValue).filter(...).measure(myThirdValue).toList(...);
Таким образом, вы можете просто иметь свою собственную Stream-обертку, которая прозрачно обертывает каждый поток в своей собственной версии (которая не является служебной или служебной информацией) и измеряет мощность любой такой точки измерения.
Это часто делается при анализе сложности алгоритмов при создании решений map/reduce. Расширяя реализацию потока, не беря атомный длинный экземпляр для подсчета, а только имя меры, ваша реализация потока может содержать неограниченное количество точек измерения, обеспечивая гибкий способ печати отчета.
Такая реализация может запомнить конкретную последовательность методов потока вместе с положением каждой точки измерения и выводит такие результаты, как:
list -> (32k)map -> (32k)filter -> (5k)map -> avg().
Такая реализация потока записывается один раз, может использоваться для тестирования, а также для отчетности.
Встраивание в повседневную реализацию дает возможность собирать статистику для определенной обработки и допускать динамическую оптимизацию с использованием другой перестановки операций. Это будет, например, оптимизатор запросов.
Итак, в вашем случае лучше всего использовать сначала StreamCounter
и в зависимости от частоты использования, количество счетчиков и близость к DRY-принципу в конечном итоге реализуют более сложное решение позже.
PS: StreamCounter
использует значение int и не является потокобезопасным, поэтому в настройке параллельного потока вместо int
следует заменить экземпляр AtomicInteger
.