Ответ 1
Каноническое решение было бы
public static CompletableFuture<Void> performAllItemsBackup(Stream<Item> items) {
ForkJoinPool pool = new ForkJoinPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new));
} finally {
pool.shutdown();
}
}
Обратите внимание, что взаимодействие между пулом ForkJoin и параллельными потоками - это неопределенная реализация, на которую вы не должны полагаться. Напротив, CompletableFuture
предоставляет выделенный API для обеспечения Executor
. Это даже не должно быть ForkJoinPool
:
public static CompletableFuture<Void> performAllItemsBackup(Stream<Item> items) {
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new));
} finally {
pool.shutdown();
}
}
В любом случае вы должны явно отключить исполнителя, вместо того чтобы полагаться на автоматическую очистку.
Если вам нужен результат F.Promise<Void>
, вы можете использовать
public static F.Promise<Void> performAllItemsBackup(Stream<Item> items) {
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new))
.handle((v, e) -> e!=null? F.Promise.<Void>throwing(e): F.Promise.pure(v))
.join();
} finally {
pool.shutdown();
}
}
Но обратите внимание, что это, как и ваш исходный код, возвращается только после завершения операции, а методы, возвращающие CompletableFuture
, позволяют выполнять операции асинхронно до тех пор, пока вызывающий абонент не вызовет join
или get
.
Чтобы вернуть истинно асинхронный Promise
, вы должны обернуть всю операцию, например.
public static F.Promise<Void> performAllItemsBackup(Stream<Item> stream) {
return F.Promise.pure(stream).flatMap(items -> {
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new))
.handle((v, e) -> e!=null? F.Promise.<Void>throwing(e): F.Promise.pure(v))
.join();
} finally {
pool.shutdown();
}
});
}
Но лучше выбрать один API вместо того, чтобы перепрыгивать назад и вперед между двумя различными API.