Вычисление с учетом состояния потока: совокупные суммы
Предполагая, что у меня есть Java IntStream, можно ли преобразовать его в IntStream с суммарными суммами? Например, поток, начинающийся с [4, 2, 6,...], должен быть преобразован в [4, 6, 12,...].
В более общем плане, как следует идти об осуществлении операций с потоком состояний? Кажется, это должно быть возможно:
myIntStream.map(new Function<Integer, Integer> {
int sum = 0;
Integer apply(Integer value){
return sum += value;
}
);
С очевидным ограничением, что это работает только на последовательные потоки. Однако Stream.map явно требует функцию отображения без сохранения. Правильно ли я теряю поток Stream.statefulMap или Stream.cumulative или не хватает точки потоков Java?
Сравните, например, с Haskell, где функция scanl1 решает именно этот пример:
scanl1 (+) [1 2 3 4] = [1 3 6 10]
Ответы
Ответ 1
Вы можете сделать это с помощью атомного номера. Например:
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
public class Accumulator {
public static LongStream toCumulativeSumStream(IntStream ints){
AtomicLong sum = new AtomicLong(0);
return ints.sequential().mapToLong(sum::addAndGet);
}
public static void main(String[] args){
LongStream sums = Accumulator.toCumulativeSumStream(IntStream.range(1, 5));
sums.forEachOrdered(System.out::println);
}
}
Выводится:
1
3
6
10
Я использовал Long для хранения сумм, потому что вполне возможно, что два ints добавляются до уровня Integer.MAX_VALUE
, а у long меньше вероятность переполнения.
Ответ 2
Это возможно сделать с коллектором, который затем создает новый поток:
class Accumulator {
public static void accept(List<Integer> list, Integer value) {
list.add(value + (list.isEmpty() ? 0 : list.get(list.size() - 1)));
}
public static List<Integer> combine(List<Integer> list1, List<Integer> list2) {
int total = list1.get(list1.size() - 1);
list2.stream().map(n -> n + total).forEach(list1::add);
return list1;
}
}
Это используется как:
myIntStream.parallel()
.collect(ArrayList<Integer>::new, Accumulator::accept, Accumulator::combine)
.stream();
Надеюсь, вы увидите, что важным атрибутом этого коллектора является то, что даже если поток параллелен при объединении экземпляров Accumulator
, он корректирует итоговые значения.
Это, очевидно, не так эффективно, как операция с картой, потому что он собирает весь поток и затем создает новый поток. Но это не просто деталь реализации: это необходимая функция того, что потоки предназначены для потенциальной параллельной обработки.
Я тестировал его с помощью IntStream.range(0, 10000).parallel()
, и он работает правильно.
Ответ 3
Следующий код поможет вам сделать это.
public static void main(String[] args) {
List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5);
Integer[] previous = {0};
List<Integer> data = integers.stream().map(no -> no + previous[0]).map(no -> previous[0] = no)
.collect(Collectors.toList());
System.out.println(data);
}