Как прервать базовое выполнение CompletableFuture

Я знаю, что дизайн CompletableFuture не контролирует его выполнение с перерывами, но, полагаю, некоторые из вас могут столкнуться с этой проблемой. CompletableFuture - очень хороший способ выполнить асинхронное выполнение, но если вы хотите, чтобы основное выполнение было прервано или остановлено, когда будущее отменено, как мы это сделаем? Или мы просто должны признать, что любой отмененный или выполненный вручную CompletableFuture не повлияет на поток, работающий там, чтобы завершить его?

То есть, на мой взгляд, очевидно бесполезная работа, требующая времени исполнителя-исполнителя. Интересно, какой подход или дизайн может помочь в этом случае?

ОБНОВЛЕНИЕ

Вот простой тест для этого

public class SimpleTest {

  @Test
  public void testCompletableFuture() throws Exception {
    CompletableFuture<Void> cf = CompletableFuture.runAsync(()->longOperation());

    bearSleep(1);

    //cf.cancel(true);
    cf.complete(null);

    System.out.println("it should die now already");
    bearSleep(7);
  }

  public static void longOperation(){
    System.out.println("started");
    bearSleep(5);
    System.out.println("completed");
  }

  private static void bearSleep(long seconds){
    try {
      TimeUnit.SECONDS.sleep(seconds);
    } catch (InterruptedException e) {
      System.out.println("OMG!!! Interrupt!!!");
    }
  }
}

Ответы

Ответ 1

A CompletableFuture не связано с асинхронным действием, которое может в конечном итоге завершить его.

Так как (в отличие от FutureTask) этот класс не имеет прямого контроля над вычисление, которое вызывает его завершение, аннулирование рассматривается как просто еще одна форма исключительного завершения. Метод cancel имеет такой же эффект, как completeExceptionally(new CancellationException()).

Возможно, даже не существует отдельного потока, который работает над его завершением (на нем может быть много потоков). Даже если есть, нет ссылки от CompletableFuture на любой поток, который имеет ссылку на него.

Как таковой, вы ничего не можете сделать через CompletableFuture, чтобы прервать любой поток, который может выполнять какую-то задачу, которая завершит его. Вам придется написать свою собственную логику, которая отслеживает любые экземпляры Thread, которые получают ссылку на CompletableFuture с намерением завершить ее.


Вот пример типа исполнения, который, я думаю, вы могли бы избежать.

public static void main(String[] args) throws Exception {
    ExecutorService service = Executors.newFixedThreadPool(1);
    CompletableFuture<String> completable = new CompletableFuture<>();
    Future<?> future = service.submit(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                if (Thread.interrupted()) {
                    return; // remains uncompleted
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    return; // remains uncompleted
                }
            }
            completable.complete("done");
        }
    });

    Thread.sleep(2000);

    // not atomic across the two
    boolean cancelled = future.cancel(true);
    if (cancelled)
        completable.cancel(true); // may not have been cancelled if execution has already completed
    if (completable.isCancelled()) {
        System.out.println("cancelled");
    } else if (completable.isCompletedExceptionally()) {
        System.out.println("exception");
    } else {
        System.out.println("success");
    }
    service.shutdown();
}

Это предполагает, что выполняемая задача настроена правильно обрабатывать прерывания.

Ответ 2

Как насчет этого?

public static <T> CompletableFuture<T> supplyAsync(final Supplier<T> supplier) {

    final ExecutorService executorService = Executors.newFixedThreadPool(1);

    final CompletableFuture<T> cf = new CompletableFuture<T>() {
        @Override
        public boolean complete(T value) {
            if (isDone()) {
                return false;
            }
            executorService.shutdownNow();
            return super.complete(value);
        }

        @Override
        public boolean completeExceptionally(Throwable ex) {
            if (isDone()) {
                return false;
            }
            executorService.shutdownNow();
            return super.completeExceptionally(ex);
        }
    };

    // submit task
    executorService.submit(() -> {
        try {
            cf.complete(supplier.get());
        } catch (Throwable ex) {
            cf.completeExceptionally(ex);
        }
    });

    return cf;
}

Простой тест:

    CompletableFuture<String> cf = supplyAsync(() -> {
        try {
            Thread.sleep(1000L);
        } catch (Exception e) {
            System.out.println("got interrupted");
            return "got interrupted";
        }
        System.out.println("normal complete");
        return "normal complete";
    });

    cf.complete("manual complete");
    System.out.println(cf.get());

Мне не нравится идея создания службы-исполнителя каждый раз, но, возможно, вы можете найти способ повторно использовать ForkJoinPool.

Ответ 3

Пожалуйста, см. мой ответ на соответствующий вопрос: Преобразование будущего Java в CompletedFuture

В приведенном здесь коде поведение CompletionStage добавляется в подкласс RunnableFuture (используется реализациями ExecutorService), поэтому вы можете прервать его правильным способом.