Строки чтения #() плохо распараллеливаются из-за неконфигурируемой политики размера партии в ее разделителе
Я не могу добиться хорошего распараллеливания обработки потока, когда источником потока является Reader
. Запустив код ниже на четырехъядерном процессоре, я наблюдаю сначала 3 ядра, затем внезапное падение до двух, затем одного ядра. Общее использование ЦП составляет около 50%.
Обратите внимание на следующие характеристики примера:
- всего 6000 строк;
- каждая строка занимает около 20 мс для обработки;
- вся процедура занимает около минуты.
Это означает, что все давление на CPU и минимальное значение ввода-вывода. Примером является сидячая утка для автоматического распараллеливания.
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
... class imports elided ...
public class Main
{
static final AtomicLong totalTime = new AtomicLong();
public static void main(String[] args) throws IOException {
final long start = System.nanoTime();
final Path inputPath = createInput();
System.out.println("Start processing");
try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) {
Files.lines(inputPath).parallel().map(Main::processLine)
.forEach(w::println);
}
final double cpuTime = totalTime.get(),
realTime = System.nanoTime()-start;
final int cores = Runtime.getRuntime().availableProcessors();
System.out.println(" Cores: " + cores);
System.out.format(" CPU time: %.2f s\n", cpuTime/SECONDS.toNanos(1));
System.out.format(" Real time: %.2f s\n", realTime/SECONDS.toNanos(1));
System.out.format("CPU utilization: %.2f%%", 100.0*cpuTime/realTime/cores);
}
private static String processLine(String line) {
final long localStart = System.nanoTime();
double ret = 0;
for (int i = 0; i < line.length(); i++)
for (int j = 0; j < line.length(); j++)
ret += Math.pow(line.charAt(i), line.charAt(j)/32.0);
final long took = System.nanoTime()-localStart;
totalTime.getAndAdd(took);
return NANOSECONDS.toMillis(took) + " " + ret;
}
private static Path createInput() throws IOException {
final Path inputPath = Paths.get("input.txt");
try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) {
for (int i = 0; i < 6_000; i++) {
final String text = String.valueOf(System.nanoTime());
for (int j = 0; j < 25; j++) w.print(text);
w.println();
}
}
return inputPath;
}
}
Мой типичный вывод:
Cores: 4
CPU time: 110.23 s
Real time: 53.60 s
CPU utilization: 51.41%
Для сравнения, если я использую слегка измененный вариант, где я сначала собираю в список, а затем обрабатываю:
Files.lines(inputPath).collect(toList()).parallelStream().map(Main::processLine)
.forEach(w::println);
Я получаю этот типичный вывод:
Cores: 4
CPU time: 138.43 s
Real time: 35.00 s
CPU utilization: 98.87%
Что может объяснить этот эффект и как я могу обойти его для полного использования?
Обратите внимание, что я изначально наблюдал это на считывателе входного потока сервлета, поэтому он не специфичен для FileReader
.
Ответы
Ответ 1
Вот ответ, прописанный в исходном коде Spliterators.IteratorSpliterator
, который используется BufferedReader#lines()
:
@Override
public Spliterator<T> trySplit() {
/*
* Split into arrays of arithmetically increasing batch
* sizes. This will only improve parallel performance if
* per-element Consumer actions are more costly than
* transferring them into an array. The use of an
* arithmetic progression in split sizes provides overhead
* vs parallelism bounds that do not particularly favor or
* penalize cases of lightweight vs heavyweight element
* operations, across combinations of #elements vs #cores,
* whether or not either are known. We generate
* O(sqrt(#elements)) splits, allowing O(sqrt(#cores))
* potential speedup.
*/
Iterator<? extends T> i;
long s;
if ((i = it) == null) {
i = it = collection.iterator();
s = est = (long) collection.size();
}
else
s = est;
if (s > 1 && i.hasNext()) {
int n = batch + BATCH_UNIT;
if (n > s)
n = (int) s;
if (n > MAX_BATCH)
n = MAX_BATCH;
Object[] a = new Object[n];
int j = 0;
do { a[j] = i.next(); } while (++j < n && i.hasNext());
batch = j;
if (est != Long.MAX_VALUE)
est -= j;
return new ArraySpliterator<>(a, 0, j, characteristics);
}
return null;
}
Также следует отметить константы:
static final int BATCH_UNIT = 1 << 10; // batch array size increment
static final int MAX_BATCH = 1 << 25; // max batch array size;
Итак, в моем примере, когда я использую 6000 элементов, я получаю всего три партии, потому что шаг размера партии равен 1024. Это точно объясняет мое наблюдение, что изначально используются три ядра, отбрасывая до двух, а затем один, поскольку более мелкие партии завершены, Тем временем я попробовал модифицированный пример с 60 000 элементов, а затем я получаю почти 100% загрузки процессора.
Чтобы решить мою проблему, я разработал приведенный ниже код, который позволяет мне превратить любой существующий поток в тот, чей Spliterator#trySplit
будет разбивать его на партии заданного размера. Самый простой способ использовать его для варианта использования из моего вопроса:
toFixedBatchStream(Files.newBufferedReader(inputPath).lines(), 20)
На более низком уровне класс ниже представляет собой оболочку spliterator, которая изменяет поведение обернутого разделителя trySplit
и оставляет другие аспекты неизменными.
import static java.util.Spliterators.spliterator;
import static java.util.stream.StreamSupport.stream;
import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {
private final Spliterator<T> spliterator;
private final int batchSize;
private final int characteristics;
private long est;
public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, long est, int batchSize) {
final int c = toWrap.characteristics();
this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;
this.spliterator = toWrap;
this.est = est;
this.batchSize = batchSize;
}
public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, int batchSize) {
this(toWrap, toWrap.estimateSize(), batchSize);
}
public static <T> Stream<T> toFixedBatchStream(Stream<T> in, int batchSize) {
return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(), batchSize), true);
}
@Override public Spliterator<T> trySplit() {
final HoldingConsumer<T> holder = new HoldingConsumer<>();
if (!spliterator.tryAdvance(holder)) return null;
final Object[] a = new Object[batchSize];
int j = 0;
do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
if (est != Long.MAX_VALUE) est -= j;
return spliterator(a, 0, j, characteristics());
}
@Override public boolean tryAdvance(Consumer<? super T> action) {
return spliterator.tryAdvance(action);
}
@Override public void forEachRemaining(Consumer<? super T> action) {
spliterator.forEachRemaining(action);
}
@Override public Comparator<? super T> getComparator() {
if (hasCharacteristics(SORTED)) return null;
throw new IllegalStateException();
}
@Override public long estimateSize() { return est; }
@Override public int characteristics() { return characteristics; }
static final class HoldingConsumer<T> implements Consumer<T> {
Object value;
@Override public void accept(T value) { this.value = value; }
}
}
Ответ 2
Эта проблема в некоторой степени устранена в ранних версиях доступа Java-9. Files.lines
был переписан, и теперь после его разделения он фактически перескакивает в середину файла с отображением памяти. Здесь результаты на моей машине (которая имеет 4 ядра HyperThreading = 8 аппаратных потоков):
Java 8u60:
Start processing
Cores: 8
CPU time: 73,50 s
Real time: 36,54 s
CPU utilization: 25,15%
Java 9b82:
Start processing
Cores: 8
CPU time: 79,64 s
Real time: 10,48 s
CPU utilization: 94,95%
Как вы можете видеть, как в реальном времени, так и в использовании ЦП значительно улучшены.
Однако эта оптимизация имеет некоторые ограничения. В настоящее время он работает только для нескольких кодировок (а именно UTF-8, ISO_8859_1 и US_ASCII), так как для произвольной кодировки вы точно не знаете, как разбить линию кодируется. Он ограничивается файлами размером не более 2 ГБ (из-за ограничений MappedByteBuffer
в Java) и, конечно же, не работает для некоторых нерегулярных файлов (таких как символьные устройства, именованные каналы, которые не могут быть отображены в память). В таких случаях старая реализация используется как резерв.
Ответ 3
Параллельное выполнение потоков основано на модели fork-join. Для упорядоченных потоков параллельное выполнение работает только, если поток можно разбить на части, строго следуя друг за другом. В общем, это невозможно с потоками, генерируемыми BufferedReader. Однако теоретически параллельное выполнение должно быть возможным для неупорядоченных потоков:
BufferedReader reader = ...;
reader.lines().unordered().map(...);
Я не уверен, поддерживает ли поток, возвращенный BufferedReader, такое параллельное выполнение. Очень простая альтернатива - создать промежуточный список:
BufferedReader reader = ...;
reader.lines().collect(toList()).parallelStream().map(...);
В этом случае параллельное выполнение начинается после того, как все строки были прочитаны. Это может быть проблемой, если чтение строк занимает много времени. В этом случае я рекомендую использовать ExecutorService для параллельного выполнения вместо параллельных потоков:
ExecutorService executor = ...;
BufferedReader reader = ...;
reader.lines()
.map(line -> executor.submit(() -> ... line ...))
.collect(toList())
.stream()
.map(future -> future.get())
.map(...);
Ответ 4
Чтобы найти истинную причину этого, вам нужно вставить в источник Files.lines()
, который вызывает BufferedReader.lines()
, который выглядит следующим образом:
public Stream<String> lines() {
Iterator<String> iter = new Iterator<String>() {
String nextLine = null;
@Override
public boolean hasNext() {
if (nextLine != null) {
return true;
} else {
try {
nextLine = readLine();
return (nextLine != null);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
@Override
public String next() {
if (nextLine != null || hasNext()) {
String line = nextLine;
nextLine = null;
return line;
} else {
throw new NoSuchElementException();
}
}
};
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
iter, Spliterator.ORDERED | Spliterator.NONNULL), false);
}
Здесь он возвращает Stream<String>
, который:
- Неизвестный размер
- Заказал
- Не null
- Непараллельно (аргумент
false
в конце StreamSupport.stream()
И, следовательно, я действительно не уверен в том, что он даже подлежит параллелизму, это можно найти, еще более углубившись в источник.
Что я знаю do, так это то, что параллельные потоки явно предоставляются в Java API. Возьмем, например, List
, он имеет метод List.stream()
и List.parallelStream()
.