Ответ 1
Проблема в том, что текущая реализация Stream API вместе с текущей реализацией IteratorSpliterator
для источника неизвестного размера плохо разбивает такие источники на параллельные задачи. Вам повезло, что у вас более 1024 файлов, иначе у вас не было бы никакого преимущества по распараллеливанию. В реализации Current Stream API учитывается значение estimateSize()
, возвращаемое из Spliterator
. IteratorSpliterator
неизвестного размера возвращает Long.MAX_VALUE
до разделения, и его суффикс всегда возвращает Long.MAX_VALUE
. Его стратегия разделения следующая:
- Определите текущий размер партии. Текущая формула должна начинаться с 1024 элементов и увеличиваться арифметически (2048, 3072, 4096, 5120 и т.д.) До достижения размера
MAX_BATCH
(что составляет 33554432 элемента). - Потребляйте входные элементы (в вашем случае "Пути" ) в массив до тех пор, пока размер партии не будет достигнут или вход исчерпан.
- Верните
ArraySpliterator
итерацию по созданному массиву в качестве префикса, оставив себя как суффикс.
Предположим, у вас есть 7000 файлов. Stream API запрашивает оценочный размер, IteratorSpliterator
возвращает Long.MAX_VALUE
. Итак, Stream API запрашивает IteratorSpliterator
для разделения, он собирает 1024 элемента из базового DirectoryStream
в массив и разбивается на ArraySpliterator
(с оцененным размером 1024) и сам (с оценочным размером, который все еще Long.MAX_VALUE
), Поскольку Long.MAX_VALUE
намного больше 1024, Stream API решает продолжить разделение большей части, даже не пытаясь разделить меньшую часть. Таким образом, общее дерево расщепления выглядит следующим образом:
IteratorSpliterator (est. MAX_VALUE elements)
| |
ArraySpliterator (est. 1024 elements) IteratorSpliterator (est. MAX_VALUE elements)
| |
/---------------/ |
| |
ArraySpliterator (est. 2048 elements) IteratorSpliterator (est. MAX_VALUE elements)
| |
/---------------/ |
| |
ArraySpliterator (est. 3072 elements) IteratorSpliterator (est. MAX_VALUE elements)
| |
/---------------/ |
| |
ArraySpliterator (est. 856 elements) IteratorSpliterator (est. MAX_VALUE elements)
|
(split returns null: refuses to split anymore)
Итак, после этого у вас есть пять параллельных задач, которые должны выполняться: на самом деле они содержат 1024, 2048, 3072, 856 и 0 элементов. Обратите внимание, что хотя последний элемент имеет 0 элементов, он по-прежнему сообщает, что он имеет приблизительно Long.MAX_VALUE
элементы, поэтому Stream API также отправит его в ForkJoinPool
. Плохо то, что Stream API считает, что дальнейшее расщепление первых четырех задач бесполезно, поскольку их расчетный размер намного меньше. Таким образом, вы получаете очень неравномерное разбиение ввода, в котором используются четыре ядра процессора max (даже если у вас гораздо больше). Если обработка каждого элемента занимает примерно одно и то же время для любого элемента, тогда весь процесс будет ждать завершения самой большой части (3072 элемента). Таким образом, максимальное ускорение у вас может быть 7000/3072 = 2,28x. Таким образом, если последовательная обработка занимает 41 секунду, то параллельный поток будет занимать около 41/2,28 = 18 секунд (что близко к вашим фактическим номерам).
Ваше решение для работы полностью прекрасное. Обратите внимание, что с помощью Files.list().parallel()
у вас также есть все элементы ввода Path
, хранящиеся в памяти (в объектах ArraySpliterator
). Таким образом, вы не будете тратить больше памяти, если вручную сбросить их в List
. Реализованные с помощью массива реализации, такие как ArrayList
(который в настоящее время создается Collectors.toList()
), могут разбиваться равномерно без каких-либо проблем, что приводит к дополнительному ускорению.
Почему такой случай не оптимизирован? Конечно, это не невозможная проблема (хотя реализация может быть довольно сложной). Похоже, что это не приоритетная проблема для разработчиков JDK. В рассылках было несколько дискуссий по этой теме. Вы можете прочитать сообщение Пола Сандоса здесь, где он комментирует мои усилия по оптимизации.