Почему параллельный поток Files.list() работает намного медленнее, чем использование Collection.parallelStream()?

Следующий фрагмент кода является частью метода, который получает список каталогов, вызывает метод извлечения для каждого файла и сериализует полученный объект препарата в xml.

try(Stream<Path> paths = Files.list(infoDir)) {
    paths
        .parallel()
        .map(this::extract)
        .forEachOrdered(drug -> {
            try {
                marshaller.write(drug);
            } catch (JAXBException ex) {
                ex.printStackTrace();
            }
        });
}

Вот тот же самый код, который делает то же самое, но с помощью простого вызова .list() для получения списка каталогов и вызова .parallelStream() в результирующем списке.

Arrays.asList(infoDir.toFile().list())
    .parallelStream()
    .map(f -> infoDir.resolve(f))
    .map(this::extract)
    .forEachOrdered(drug -> {
        try {
            marshaller.write(drug);
        } catch (JAXBException ex) {
            ex.printStackTrace();
    }
});

Моя машина - это четырехъядерный MacBook Pro, Java v 1.8.0_60 (build 1.8.0_60-b27).

Я обрабатываю ~ 7000 файлов. Средние из 3 пробегов:

Первая версия: С .parallel(): 20 секунд. Без .parallel(): 41 секунд

Вторая версия: С .parallelStream(): 12 секунд. С .stream(): 41 секунда.

Те 8 секунд в параллельном режиме кажутся огромной разницей, учитывая, что метод extract, который читает из потока, и делает всю тяжелую работу и вызов write, выполняющий окончательную запись, не изменяется.

Ответы

Ответ 1

Проблема в том, что текущая реализация Stream API вместе с текущей реализацией IteratorSpliterator для источника неизвестного размера плохо разбивает такие источники на параллельные задачи. Вам повезло, что у вас более 1024 файлов, иначе у вас не было бы никакого преимущества по распараллеливанию. В реализации Current Stream API учитывается значение estimateSize(), возвращаемое из Spliterator. IteratorSpliterator неизвестного размера возвращает Long.MAX_VALUE до разделения, и его суффикс всегда возвращает Long.MAX_VALUE. Его стратегия разделения следующая:

  • Определите текущий размер партии. Текущая формула должна начинаться с 1024 элементов и увеличиваться арифметически (2048, 3072, 4096, 5120 и т.д.) До достижения размера MAX_BATCH (что составляет 33554432 элемента).
  • Потребляйте входные элементы (в вашем случае "Пути" ) в массив до тех пор, пока размер партии не будет достигнут или вход исчерпан.
  • Верните ArraySpliterator итерацию по созданному массиву в качестве префикса, оставив себя как суффикс.

Предположим, у вас есть 7000 файлов. Stream API запрашивает оценочный размер, IteratorSpliterator возвращает Long.MAX_VALUE. Итак, Stream API запрашивает IteratorSpliterator для разделения, он собирает 1024 элемента из базового DirectoryStream в массив и разбивается на ArraySpliterator (с оцененным размером 1024) и сам (с оценочным размером, который все еще Long.MAX_VALUE), Поскольку Long.MAX_VALUE намного больше 1024, Stream API решает продолжить разделение большей части, даже не пытаясь разделить меньшую часть. Таким образом, общее дерево расщепления выглядит следующим образом:

                     IteratorSpliterator (est. MAX_VALUE elements)
                           |                    |
ArraySpliterator (est. 1024 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 2048 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 3072 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 856 elements)    IteratorSpliterator (est. MAX_VALUE elements)
                                                    |
                                        (split returns null: refuses to split anymore)

Итак, после этого у вас есть пять параллельных задач, которые должны выполняться: на самом деле они содержат 1024, 2048, 3072, 856 и 0 элементов. Обратите внимание, что хотя последний элемент имеет 0 элементов, он по-прежнему сообщает, что он имеет приблизительно Long.MAX_VALUE элементы, поэтому Stream API также отправит его в ForkJoinPool. Плохо то, что Stream API считает, что дальнейшее расщепление первых четырех задач бесполезно, поскольку их расчетный размер намного меньше. Таким образом, вы получаете очень неравномерное разбиение ввода, в котором используются четыре ядра процессора max (даже если у вас гораздо больше). Если обработка каждого элемента занимает примерно одно и то же время для любого элемента, тогда весь процесс будет ждать завершения самой большой части (3072 элемента). Таким образом, максимальное ускорение у вас может быть 7000/3072 = 2,28x. Таким образом, если последовательная обработка занимает 41 секунду, то параллельный поток будет занимать около 41/2,28 = 18 секунд (что близко к вашим фактическим номерам).

Ваше решение для работы полностью прекрасное. Обратите внимание, что с помощью Files.list().parallel() у вас также есть все элементы ввода Path, хранящиеся в памяти (в объектах ArraySpliterator). Таким образом, вы не будете тратить больше памяти, если вручную сбросить их в List. Реализованные с помощью массива реализации, такие как ArrayList (который в настоящее время создается Collectors.toList()), могут разбиваться равномерно без каких-либо проблем, что приводит к дополнительному ускорению.

Почему такой случай не оптимизирован? Конечно, это не невозможная проблема (хотя реализация может быть довольно сложной). Похоже, что это не приоритетная проблема для разработчиков JDK. В рассылках было несколько дискуссий по этой теме. Вы можете прочитать сообщение Пола Сандоса здесь, где он комментирует мои усилия по оптимизации.

Ответ 2

В качестве альтернативы вы можете использовать этот специальный разделитель, специально предназначенный для DirectoryStream:

public class DirectorySpliterator implements Spliterator<Path> {
    Iterator<Path> iterator;
    long est;

    private DirectorySpliterator(Iterator<Path> iterator, long est) {
        this.iterator = iterator;
        this.est = est;
    }

    @Override
    public boolean tryAdvance(Consumer<? super Path> action) {
        if (iterator == null) {
            return false;
        }
        Path path;
        try {
            synchronized (iterator) {
                if (!iterator.hasNext()) {
                    iterator = null;
                    return false;
                }
                path = iterator.next();
            }
        } catch (DirectoryIteratorException e) {
            throw new UncheckedIOException(e.getCause());
        }
        action.accept(path);
        return true;
    }

    @Override
    public Spliterator<Path> trySplit() {
        if (iterator == null || est == 1)
            return null;
        long e = this.est >>> 1;
        this.est -= e;
        return new DirectorySpliterator(iterator, e);
    }

    @Override
    public long estimateSize() {
        return est;
    }

    @Override
    public int characteristics() {
        return DISTINCT | NONNULL;
    }

    public static Stream<Path> list(Path parent) throws IOException {
        DirectoryStream<Path> ds = Files.newDirectoryStream(parent);
        int splitSize = Runtime.getRuntime().availableProcessors() * 8;
        DirectorySpliterator spltr = new DirectorySpliterator(ds.iterator(), splitSize);
        return StreamSupport.stream(spltr, false).onClose(() -> {
            try {
                ds.close();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
    }
}

Просто замените Files.list на DirectorySpliterator.list и он будет равномерно распределяться без промежуточной буферизации. Здесь мы используем тот факт, что DirectoryStream создает список каталогов без какого-либо определенного порядка, поэтому каждый параллельный поток просто берет из него последующую запись (синхронно, так как у нас уже есть синхронные операции ввода-вывода, дополнительная синхронизация имеет следующее- ничего накладных). Параллельный порядок будет отличаться каждый раз (даже если используется forEachOrdered), но Files.list() также не гарантирует порядок.

Единственная нетривиальная часть здесь - сколько параллельных задач для создания. Поскольку мы не знаем, сколько файлов в папке, пока мы не пройдем, хорошо использовать availableProcessors() в качестве базы. Я создаю индивидуальные задачи 8 x availableProcessors(), которые кажутся хорошим мелкозернистым/крупнозернистым компромиссом: если обработка на одном элементе неравномерна, имея больше задач, чем процессоры, это поможет сбалансировать нагрузку.

Ответ 3

Другой альтернативой вашему обходному пути является использование .collect(Collectors.toList()).parallelStream() в вашем потоке, например

try(Stream<Path> paths = Files.list(infoDir)) {
    paths
        .collect(Collectors.toList())
        .parallelStream()
        .map(this::extract)
        .forEachOrdered(drug -> {
            try {
                marshaller.write(drug);
            } catch (JAXBException ex) {
                ex.printStackTrace();
            }
        });
}

При этом вам не нужно вызывать .map(f -> infoDir.resolve(f)), и производительность должна быть похожа на ваше второе решение.