Использование параллельного потока для возврата самого быстрого поставленного значения
У меня есть набор поставщиков, которые все дают один и тот же результат, но с другой (и переменной) скоростью.
Я хочу элегантный способ начать работу с поставщиками одновременно, и как только один из них получит значение, верните его (отбрасывая другие результаты).
Я попытался использовать параллельные потоки и Stream.findAny()
для этого, но он всегда блокируется до тех пор, пока не будут созданы все результаты.
Здесь unit test, демонстрирующий мою проблему:
import org.junit.Test;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Stream;
import static org.junit.Assert.*;
public class RaceTest {
@Test
public void testRace() {
// Set up suppliers
Set<Supplier<String>> suppliers = Collections.newSetFromMap(new ConcurrentHashMap<>());
suppliers.add(() -> "fast"); // This supplier returns immediately
suppliers.add(() -> {
try {
Thread.sleep(10_000);
return "slow";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}); // This supplier takes 10 seconds to produce a value
Stream<Supplier<String>> stream = suppliers.parallelStream();
assertTrue(stream.isParallel()); // Stream can work in parallel
long start = System.currentTimeMillis();
Optional<String> winner = stream
.map(Supplier::get)
.findAny();
long duration = System.currentTimeMillis() - start;
assertTrue(winner.isPresent()); // Some value was produced
assertEquals("fast", winner.get()); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
}
}
Результат теста заключается в том, что последнее утверждение не выполняется, так как весь тест занимает около 10 секунд.
Что я здесь делаю неправильно?
Ответы
Ответ 1
В этом случае вам лучше использовать Callable
вместо Supplier
(такую же функциональную подпись) и использовать старый добрый API concurrency, который существует с Java 5:
Set<Callable<String>> suppliers=new HashSet<>();
suppliers.add(() -> "fast"); // This supplier returns immediately
suppliers.add(() -> {
Thread.sleep(10_000);
return "slow";
}
);
ExecutorService es=Executors.newCachedThreadPool();
try {
String result = es.invokeAny(suppliers);
System.out.println(result);
} catch (InterruptedException|ExecutionException ex) {
Logger.getLogger(MyClass.class.getName()).log(Level.SEVERE, null, ex);
}
es.shutdown();
Обратите внимание, как весь "запустить все и вернуть самый быстрый" становится единственным вызовом метода...
У него также есть бонус отмены/прерывания всех ожидающих операций, как только один результат будет доступен, поэтому медленная операция обычно не будет ждать здесь всего десяти секунд (ну, в большинстве случаев, поскольку время не является детерминированным).
Ответ 2
Используемый вами код является недетерминированным. Цитирование Javadoc findAny()
:
Поведение этой операции явно недетерминировано; он может свободно выбирать любой элемент в потоке.
Вы можете использовать CompletionService
и передать все задачи на него. Затем CompletionService.take()
вернет Future
первой завершенной задачи.
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(suppliers.size());
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
suppliers.forEach(s -> completionService.submit(() -> s.get()));
String winner = completionService.take().get();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
Ответ 3
Stream API не подходит для таких задач, которые не гарантируют, когда задачи будут завершены. Лучшим решением было бы использовать CompletableFuture
:
long start = System.currentTimeMillis();
String winner = CompletableFuture
.anyOf(suppliers.stream().map(CompletableFuture::supplyAsync)
.toArray(CompletableFuture[]::new)).join().toString();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
Обратите внимание, что он все равно не может запускать всех поставщиков параллельно, если общий FJP не имеет уровня parallelism. Чтобы исправить это, вы можете создать свой собственный пул, который имеет необходимый уровень parallelism:
long start = System.currentTimeMillis();
ForkJoinPool fjp = new ForkJoinPool(suppliers.size());
String winner = CompletableFuture
.anyOf(suppliers.stream().map(s -> CompletableFuture.supplyAsync(s, fjp))
.toArray(CompletableFuture[]::new)).join().toString();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
fjp.shutdownNow();