Ответ 1
В теории вы можете сделать что-то вроде этого:
public void findInterestingFoo(Stream<Foo> foos) {
Spliterator<Foo> sp = foos.spliterator();
long size = sp.getExactSizeIfKnown();// returns -1 if not known
// or sp.estimateSize(); // Long.MAX_VALUE means "unknown"
internalState.update(
StreamSupport.stream(sp, size > PARALLEL_THRESHOLD)
.collect(customCollector()));
}
spliterator()
- это операция терминала, которая потребляет входной поток, но вы можете передать Spliterator
до StreamSupport.stream
для создания потока с точно такими же свойствами. Второй параметр уже указывает, должен ли поток быть параллельным.
В теории.
На практике реализация текущего потока возвращает разные реализации Spliterator
в зависимости от того, параллелен ли поток или нет. Это означает, что воссоздание потока в виде параллельного потока может закончиться потоком, который не способен выполнять параллельную обработку, когда исходный поток уже не был параллелен перед вызовом spliterator()
.
Хорошо работает, если нет промежуточных операций, например. когда вы прямо передаете Stream
, созданный из коллекции или массива.
Вызов parallel()
до spliterator()
, чтобы получить поток, поддерживающий параллельную передачу, который может продолжаться последовательно, если вы решите это сделать, работает в во многих случаях. Однако, если во входном потоке есть промежуточные операции с выражением состояния, такие как sorted()
, они могут быть исправлены для параллельной работы, даже если вы последовательно collect
(или наоборот).
Другая проблема имеет фундаментальный характер. Количество элементов на самом деле не говорит, будет ли преимущество в параллельной обработке или нет. Это зависит от рабочей нагрузки на каждом элементе, что зависит не только от операции с вашим терминалом collect
, но и от операций, уже привязанных к потоку, перед входом в ваш метод. Даже если вы сделаете вывод, что рабочая нагрузка ваших коллекторов уже достаточно высока, чтобы заслуживать параллельной обработки, может быть, что входящий поток имеет операции типа skip
, limit
или distinct
(по упорядоченному потоку), которые часто ухудшаются параллельно и требуют совершенно другого порога.
Более простое решение состоит в том, чтобы позволить вызывающему абоненту решить, поскольку вызывающий абонент знает что-то о размере и характере потока. Вам даже не нужно добавлять параметр к вашей сигнатуре методов, поскольку вызывающий может уже принять решение, вызвав parallel()
или sequential()
в потоке, прежде чем передавать его вашему методу, и вы можете уважать это, просто не меняя режим.