Как объединить 3 или более CompletionStages?
Если есть 2 CompletionStages, я могу объединить их с методом thenCombine
:
CompletionStage<A> aCompletionStage = getA();
CompletionStage<B> bCompletionStage = getB();
CompletionStage<Combined> combinedCompletionStage =
aCompletionStage.thenCombine(bCompletionStage, (aData, bData) -> combine(aData, bData));
Если у меня есть 3 или более CompletionStages, я могу создать цепочку методов thenCombine
, но я должен использовать временные объекты для передачи результатов. Например, вот решение, использующее Pair
и Triple
из пакета org.apache.commons.lang3.tuple
:
CompletionStage<A> aCompletionStage = getA();
CompletionStage<B> bCompletionStage = getB();
CompletionStage<C> cCompletionStage = getC();
CompletionStage<D> dCompletionStage = getD();
CompletionStage<Combined> combinedDataCompletionStage =
aCompletionStage.thenCombine(bCompletionStage, (Pair::of))
.thenCombine(cCompletionStage, (ab, c) ->
Triple.of(ab.getLeft(), ab.getRight(), c))
.thenCombine(dCompletionStage, (abc, d) ->
combine(abc.getLeft(), abc.getMiddle(), abc.getRight(), d));
Есть ли лучший способ комбинировать результаты с несколькими CompletionStages?
Ответы
Ответ 1
Единственный способ объединить несколько этапов, которые хорошо масштабируются с растущим числом этапов, заключается в использовании CompletableFuture
. Если ваш CompletionStage
arent CompletableFuture
, вы все равно можете преобразовать их, используя .toCompletableFuture()
:
CompletableFuture<A> aCompletionStage = getA().toCompletableFuture();
CompletableFuture<B> bCompletionStage = getB().toCompletableFuture();
CompletableFuture<C> cCompletionStage = getC().toCompletableFuture();
CompletableFuture<D> dCompletionStage = getD().toCompletableFuture();
CompletionStage<Combined> combinedDataCompletionStage = CompletableFuture.allOf(
aCompletionStage, bCompletionStage, cCompletionStage, dCompletionStage)
.thenApply(ignoredVoid -> combine(
aCompletionStage.join(), bCompletionStage.join(),
cCompletionStage.join(), dCompletionStage.join()) );
Это содержит больше шаблонов, чем объединение двух этапов с помощью thenCombine
, но шаблонная плита не растет при добавлении к ней дополнительных этапов.
Обратите внимание, что даже с вашим оригинальным подходом thenCombine
вам не нужно a Triple
, a Pair
достаточно:
CompletionStage<Combined> combinedDataCompletionStage =
aCompletionStage.thenCombine(bCompletionStage, (Pair::of)).thenCombine(
cCompletionStage.thenCombine(dCompletionStage, Pair::of),
(ab, cd) -> combine(ab.getLeft(), ab.getRight(), cd.getLeft(), cd.getRight()));
Тем не менее, он не масштабируется, если вы хотите объединить больше этапов.
Совместное решение (относительно сложности) может быть:
CompletionStage<Combined> combinedDataCompletionStage = aCompletionStage.thenCompose(
a -> bCompletionStage.thenCompose(b -> cCompletionStage.thenCompose(
c -> dCompletionStage.thenApply(d -> combine(a, b, c, d)))));
Это проще в своей структуре, но все еще не масштабируется с большим количеством этапов.
Ответ 2
Третий ответ Холгера может быть немного короче:
CompletionStage<Combined> combinedDataCompletionStage = aCompletionStage.thenCompose(
a -> bCompletionStage.thenCompose(
b -> cCompletionStage.thenCombine(dCompletionStage,
(c, d) -> combine(a, b, c, d))));
Ответ 3
Я думаю, что вы должны использовать промежуточный объект, но один из ваших собственных вместо использования Pair
и Tuple
public R method() {
CompletableFuture<A> aFuture = getAFuture();
CompletableFuture<B> bFuture = getBFuture();
CompletableFuture<C> cFuture = getCFuture();
CompletableFuture<D> dFuture = getDFuture();
return CompletableFuture.completedFuture(new WellNamedResultHolder())
.thenCombineAsync(aFuture, WellNamedResultHolder::withAResult)
.thenCombineAsync(bFuture, WellNamedResultHolder::withBResult)
.thenCombineAsync(cFuture, WellNamedResultHolder::withCResult)
.thenCombineAsync(dFuture, WellNamedResultHolder::withDResult)
.thenApplyAsync(this::combineAllTheResults);
}
private static class WellNamedResultHolder {
private A aResult;
private B bResult;
private C cResult;
private D dResult;
// Getters
public WellNamedResultHolder withAResult(final A aResult) {
this.aResult = aResult;
return this;
}
public WellNamedResultHolder withBResult(final B bResult) {
this.bResult = bResult;
return this;
}
public WellNamedResultHolder withCResult(final C cResult) {
this.cResult = cResult;
return this;
}
public WellNamedResultHolder withDResult(final D dResult) {
this.dResult = dResult;
return this;
}
}
Фактическая форма держателя результата, очевидно, может измениться в соответствии с вашими потребностями, предоставляя вам большую гибкость. Вы также отвечаете за то, что происходит после завершения этого будущего. Хотя здесь больше шаблонов, вы получаете код, который более наглядно описывает происходящее (какой ломбок может привести в порядок).
Ответ 4
Вы спрашивали о "3 или более", если они есть в списке как CompletableFutures (см. Другие ответы), вы можете использовать этот удобный метод:
private static <T> CompletableFuture<List<T>> join(List<CompletableFuture<T>> executionPromises) {
CompletableFuture<Void> joinedPromise = CompletableFuture.allOf(executionPromises.toArray(CompletableFuture[]::new));
return joinedPromise.thenApply(voit -> executionPromises.stream().map(CompletableFuture::join).collect(Collectors.toList()));
}
Он преобразует ваш "список фьючерсов" в "будущее для списка результатов".
Ответ 5
У меня была похожая проблема, но у меня было более 3 завершаемых фьючерсов, поэтому, основываясь на ответе Хольгера, я сделал небольшую универсальную утилиту.
public static <T, R> CompletableFuture<R> allOf(List<CompletableFuture<T>> args, Function<List<T>, R> combiner) {
final Queue<CompletableFuture<T>> queue = new LinkedList<>();
for (CompletableFuture<T> arg : args) {
queue.add(arg);
}
return aggregator(queue, new ArrayList<>(), combiner);
}
private static <T, R> CompletableFuture<R> aggregator(Queue<CompletableFuture<T>> queue, List<T> arg,
Function<List<T>, R> combiner) {
if (queue.size() == 2)
return queue.poll().thenCombine(queue.poll(), (c, d) -> {
arg.add(c);
arg.add(d);
return combiner.apply(arg);
});
return queue.poll().thenCompose(data -> {
arg.add(data);
return aggregator(queue, arg, combiner);
});
}