Можно ли использовать API Java 8 Streams для асинхронной обработки?
Я играл с CompletionStage/CompletableFuture в Java 8, чтобы выполнить асинхронную обработку, которая работает достаточно хорошо. Однако иногда мне нужна сцена для выполнения асинхронной обработки итератора/потока элементов, и, похоже, не существует способа сделать это.
В частности, Stream.forEach() имеет семантику, которая после вызова всех элементов была обработана. Я хотел бы получить то же самое, но вместо этого с помощью CompletionStage, например:
CompletionStage<Void> done = stream.forEach(...);
done.thenRun(...);
Если поток поддерживается асинхронным потоковым результатом, это будет намного лучше, чем ждать его завершения в самом коде выше.
Возможно ли построить это с помощью существующего Java 8 API? Обходные?
Ответы
Ответ 1
Насколько я знаю, API потоков не поддерживает асинхронную обработку событий. Похоже, что вы хотите что-то вроде Reactive Extensions для .NET, и есть порт Java, называемый RxJava, созданный Netflix.
RxJava поддерживает многие из тех же операций высокого уровня, что и потоки Java 8 (такие как карта и фильтр), и является асинхронным.
Обновить. В настоящее время существует инициатива реактивных потоков, и похоже, что JDK 9 будет включать поддержка хотя бы части его, хотя класс Flow.
Ответ 2
Как указано в @KarolKrol, вы можете сделать это с потоком CompletableFuture
.
Существует библиотека, которая построена поверх потоков JDK8, чтобы облегчить работу с потоками CompletableFuture
под названием cyclops-react.
Чтобы создать свои потоки, вы можете использовать API-интерфейс с быстрым протезом cyclops-реагировать или вы можете использовать простой-реагировать Stage
s.
Ответ 3
cyclops-react (я являюсь автором этой библиотеки), предоставляет StreamUtils для обработки потоков.
Одной из функций, которые он предоставляет, является futureOperations, которая обеспечивает доступ к стандартным операциям терминала Stream (а затем и к некоторым) с помощью твиста - Stream выполняется асинхронно, и результат возвращается внутри CompletableFuture..e.g
Stream<Integer> stream = Stream.of(1,2,3,4,5,6)
.map(i->i+2);
CompletableFuture<List<Integer>> asyncResult = StreamUtils.futureOperations(stream,
Executors.newFixedThreadPool(1))
.collect(Collectors.toList());
Существует также класс convience ReactiveSeq, который обертывает Stream и предоставляет те же функциональные возможности, с неплохим белым API
CompletableFuture<List<Integer>> asyncResult = ReactiveSeq.of(1,2,3,4,5,6)
.map(i->i+2)
.futureOperations(
Executors.newFixedThreadPool(1))
.collect(Collectors.toList());
Как указал Адам cyclops-react FutureStreams предназначены для обработки данных асинхронно (путем смешивания фьючерсов и потоков вместе) - особенно подходит для многопоточных операций, которые включают блокирование ввода-вывода (например, чтение файлов, создание вызовов в режиме ожидания, вызов отдыха и т.д.).
Ответ 4
Можно создать поток, сопоставить каждый элемент с CompletionStage
и собрать результаты с помощью CompletionStage.thenCombine()
, но полученный код не будет более читабельным, а затем простым способом для этого.
CompletionStage<Collection<Result>> collectionStage =
CompletableFuture.completedFuture(
new LinkedList<>()
);
for (Request request : requests) {
CompletionStage<Result> resultStage = performRequest(request);
collectionStage = collectionStage.thenCombine(
resultStage,
(collection, result) -> {
collection.add(result);
return collection;
}
);
}
return collectionStage;
Этот пример может быть легко преобразован в функциональный для каждого, не теряя удобочитаемости. Но использование потока reduce
или collect
требует дополнительного не очень тонкого кода.
Обновление: CompletableFuture.allOf
и CompletableFuture.join
предоставляют другой, более читаемый способ преобразования коллекции будущих результатов в будущий сбор результатов.