Что-то не так с использованием I/O + ManagedBlocker в Java8 parallelStream()?
По умолчанию "paralellStream()" в Java 8 использует общий ForkJoinPool
, который может быть проблемой с задержкой, если общие потоки пулов исчерпаны при отправке задачи. Однако во многих случаях достаточно мощности процессора, и задачи достаточно короткие, так что это не проблема. Если у нас есть какие-то длительные задачи, это, конечно, потребует тщательного рассмотрения, но для этого вопроса позвольте предположить, что это не проблема.
Однако заполнение ForkJoinPool
задачами ввода-вывода, которые фактически не выполняют работу с привязкой к процессору, - это способ внедрения узкого места, даже если имеется достаточная мощность процессора. Я понял это. Однако для этого мы имеем ManagedBlocker
for. Поэтому, если у нас есть задача ввода-вывода, мы должны просто разрешить ForkJoinPool
управлять тем, что находится внутри ManagedBlocker
. Это звучит невероятно просто. Однако, к моему удивлению, использование ManagedBlocker
- довольно сложный API для простой вещи. И ведь я думаю, что это обычная проблема. Поэтому я просто создал простой утилитный метод, который упрощает использование ManagedBlocker
для обычного случая:
public class BlockingTasks {
public static<T> T callInManagedBlock(final Supplier<T> supplier) {
final SupplierManagedBlock<T> managedBlock = new SupplierManagedBlock<>(supplier);
try {
ForkJoinPool.managedBlock(managedBlock);
} catch (InterruptedException e) {
throw new Error(e);
}
return managedBlock.getResult();
}
private static class SupplierManagedBlock<T> implements ForkJoinPool.ManagedBlocker {
private final Supplier<T> supplier;
private T result;
private boolean done = false;
private SupplierManagedBlock(final Supplier<T> supplier) {
this.supplier = supplier;
}
@Override
public boolean block() {
result = supplier.get();
done = true;
return true;
}
@Override
public boolean isReleasable() {
return done;
}
public T getResult() {
return result;
}
}
}
Теперь, если я хочу загрузить html-код пары веб-сайтов в paralell, я мог бы это сделать без ввода-вывода, вызывающего любую проблему:
public static void main(String[] args) {
final List<String> pagesHtml = Stream
.of("https://google.com", "https://stackoverflow.com", "...")
.map((url) -> BlockingTasks.callInManagedBlock(() -> download(url)))
.collect(Collectors.toList());
}
Я немного удивлен, что нет класса, такого как BlockingTasks
выше, поставляемого с Java (или я его не нашел?), но его было не так сложно построить.
Когда я google для "java 8 parallel stream", я получаю в первых четырех результатах те статьи, которые утверждают, что из-за проблемы ввода-вывода Fork/Join отстой в Java:
Я несколько изменил свои поисковые термины, и пока многие люди жалуются на то, как ужасная жизнь, я нашел, что никто не говорит о таком решении, как выше. Поскольку я не чувствую себя Марвином (мозг, как планета), и Java 8 доступен довольно долго, я подозреваю, что есть что-то ужасно неправильное в том, что я предлагаю там.
Я собрал небольшое испытание:
public static void main(String[] args) {
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Start");
IntStream.range(0, 10).parallel().forEach((x) -> sleep());
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": End");
}
public static void sleep() {
try {
System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Sleeping " + Thread.currentThread().getName());
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new Error(e);
}
}
Я побежал, что получил следующий результат:
18:41:29.021: Start
18:41:29.033: Sleeping main
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-2
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-5
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-4
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-6
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-3
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-7
18:41:39.034: Sleeping main
18:41:39.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:49.035: End
Итак, на моем 8-процессорном компьютере ForkJoinPool
естественно выбирает 8 потоков, завершает первые 8 задач и, наконец, последние две задачи, что означает, что это заняло 20 секунд, и если бы были поставлены другие задачи, пул все равно не мог бы использоваться явно простаивающие процессоры (за исключением 6 ядер за последние 10 секунд).
Затем я использовал...
IntStream.range(0, 10).parallel().forEach((x) -> callInManagedBlock(() -> { sleep(); return null; }));
... вместо...
IntStream.range(0, 10).parallel().forEach((x) -> sleep());
... и получил следующий результат:
18:44:10.93: Start
18:44:10.945: Sleeping main
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-7
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-1
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-6
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-3
18:44:10.955: Sleeping ForkJoinPool.commonPool-worker-2
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-4
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-5
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-0
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-11
18:44:20.957: End
Мне кажется, что это работает, добавлены дополнительные потоки, чтобы компенсировать мое ложное "блокирование действия ввода-вывода" (сон). Время было сокращено до 10 секунд, и я полагаю, что если бы я поставил в очередь больше задач, которые могли бы использовать доступную мощность процессора.
Что-то не так с этим решением или вообще с использованием ввода-вывода в потоках, если операция ввода-вывода завернута в ManagedBlock
?
Ответы
Ответ 1
Короче говоря, есть проблемы с вашим решением. Это определенно улучшает использование кода блокировки внутри параллельного потока, а некоторые сторонние библиотеки предоставляют аналогичное решение (см., Например, Blocking
класс в jOOλ библиотека). Однако это решение не изменяет стратегию внутреннего разделения, используемую в Stream API. Количество подзадач, созданных Stream API, контролируется предопределенной константой в классе AbstractTask
:
/**
* Default target factor of leaf tasks for parallel decomposition.
* To allow load balancing, we over-partition, currently to approximately
* four tasks per processor, which enables others to help out
* if leaf tasks are uneven or some processors are otherwise busy.
*/
static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
Как вы можете видеть, он в четыре раза больше, чем общий пул parallelism (который по умолчанию является числом ядер процессора). Реальный алгоритм расщепления немного сложнее, но примерно у вас не может быть больше 4x-8x задач, даже если все они блокируются.
Например, если у вас 8 ядер процессора, ваш тест Thread.sleep()
будет хорошо работать до IntStream.range(0, 32)
(как 32 = 8 * 4). Однако для IntStream.range(0, 64)
у вас будет 32 параллельных задания, каждая из которых обрабатывает два входных числа, поэтому вся обработка займет 20 секунд, а не 10.