Ответ 1
Тщательное чтение кода реализации потоков в ReduceOps.java показывает, что функция объединения вызывается только тогда, когда завершается ReduceTask
, и ReduceTask
экземпляры используются только при параллельной оценке конвейера. Таким образом, в текущей реализации комбайнер никогда не вызывается при оценке последовательного конвейера.
В спецификации нет ничего, что бы гарантировать это. A Collector
- это интерфейс, который создает требования к его реализациям, и нет исключений, предоставляемых для последовательных потоков. Лично мне трудно представить, почему последовательная оценка конвейера может потребоваться вызвать комбайнера, но кто-то с большим воображением, чем я, может найти для этого разумное применение и реализовать его. Спецификация позволяет это, и хотя сегодня реализация не делает этого, вам все равно придется подумать об этом.
Это не должно удивлять. Проектный центр API потоков поддерживает параллельное выполнение на равных началах с последовательным выполнением. Конечно, программа может наблюдать, будет ли она выполняться последовательно или параллельно. Но дизайн API - это поддержка стиля программирования, который позволяет либо.
Если вы пишете коллекционер, и вы обнаружите, что невозможно (или неудобно или сложно) написать функцию ассоциативного объединителя, что побуждает вас ограничивать ваш поток последовательным исполнением, возможно, это означает, что вы направляетесь в неправильное направление. Пришло время немного отступить и подумать о том, чтобы подойти к проблеме по-другому.
Обычная операция стиля сокращения, которая не требует функции ассоциативного сумматора, называется fold-left. Основная характеристика заключается в том, что функция сгиба применяется строго слева направо, исходя из одного за раз. Я не знаю, как можно распараллелить fold-left.
Когда люди пытаются объединить коллекционеры так, как мы говорили, они обычно ищут что-то вроде фолд-левого. API Streams API не имеет прямой поддержки API для этой операции, но довольно легко писать. Например, предположим, что вы хотите уменьшить список строк, используя эту операцию: повторите первую строку, а затем добавьте вторую. Довольно легко показать, что эта операция не является ассоциативной:
List<String> list = Arrays.asList("a", "b", "c", "d", "e");
System.out.println(list.stream()
.collect(StringBuilder::new,
(a, b) -> a.append(a.toString()).append(b),
(a, b) -> a.append(a.toString()).append(b))); // BROKEN -- NOT ASSOCIATIVE
Выполняется последовательно, это дает желаемый результат:
aabaabcaabaabcdaabaabcaabaabcde
Но при параллельном запуске может возникнуть нечто вроде этого:
aabaabccdde
Так как он "работает" последовательно, мы можем обеспечить его выполнение, вызвав sequential()
и вернемся к нему, если комбайнер выдаст исключение. Кроме того, поставщик должен вызываться ровно один раз. Невозможно объединить промежуточные результаты, поэтому, если поставщик вызывается дважды, у нас уже проблемы. Но поскольку мы "знаем", поставщик вызывается только один раз в последовательном режиме, большинство людей не беспокоится об этом. Фактически, я видел, как люди пишут "поставщики", которые возвращают какой-то существующий объект вместо создания нового, в нарушение контракта поставщика.
При этом использовании формы 3-arg collect()
у нас есть две из трех функций, нарушающих их контракты. Разве это не должно говорить нам делать что-то по-другому?
Основная работа здесь выполняется с помощью функции аккумулятора. Чтобы выполнить уменьшение стиля сгиба, мы можем применить эту функцию в строгом порядке слева направо, используя forEachOrdered()
. Мы должны сделать немного настройки и завершения кода до и после, но это не проблема:
StringBuilder a = new StringBuilder();
list.parallelStream()
.forEachOrdered(b -> a.append(a.toString()).append(b));
System.out.println(a.toString());
Естественно, это работает отлично параллельно, хотя преимущества параллельной работы могут быть несколько сведены на нет требованиями порядка forEachOrdered()
.
В общем, если вы хотите, чтобы вы выполняли изменчивое сокращение, но вам не хватает функции ассоциативного объединителя, что позволяет ограничить поток последовательным выполнением, переделать проблему как операцию с левой стороны и использовать forEachRemaining()
на вашей функции аккумулятора.