Отмена с будущим и обещание в Scala
Это продолжение моего предыдущего вопроса.
Предположим, что у меня есть задача, которая выполняет прерывистый блокирующий вызов. Я хотел бы запустить его как Future
и отменить его с помощью метода failure
Promise
.
Я хотел бы, чтобы отмена работала следующим образом:
-
Если кто-то отменил задачу до ее завершения, мне бы хотелось, чтобы задача завершилась "немедленно", прерывая блокирующий вызов, если он уже запущен, и я хотел бы, чтобы Future
вызывал onFailure
.
-
Если отменить задачу после завершения задачи, я хотел бы получить статус, заявляющий, что отмена завершилась с момента завершения задачи.
Имеет ли смысл? Можно ли реализовать в Scala? Есть ли примеры таких реализаций?
Ответы
Ответ 1
scala.concurrent.Future доступен только для чтения, поэтому один читатель не может испортить вещи другим читателям.
Похоже, что вы должны реализовать то, что хотите:
def cancellableFuture[T](fun: Future[T] => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
val p = Promise[T]()
val f = p.future
p tryCompleteWith Future(fun(f))
(f, () => p.tryFailure(new CancellationException))
}
val (f, cancel) = cancellableFuture( future => {
while(!future.isCompleted) continueCalculation // isCompleted acts as our interrupted-flag
result // when we're done, return some result
})
val wasCancelled = cancel() // cancels the Future (sets its result to be a CancellationException conditionally)
Ответ 2
Вот прерванная версия кода Виктора на его комментарии (Виктор, пожалуйста, исправьте меня, если я неверно истолковал).
object CancellableFuture extends App {
def interruptableFuture[T](fun: () => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
val p = Promise[T]()
val f = p.future
val aref = new AtomicReference[Thread](null)
p tryCompleteWith Future {
val thread = Thread.currentThread
aref.synchronized { aref.set(thread) }
try fun() finally {
val wasInterrupted = (aref.synchronized { aref getAndSet null }) ne thread
//Deal with interrupted flag of this thread in desired
}
}
(f, () => {
aref.synchronized { Option(aref getAndSet null) foreach { _.interrupt() } }
p.tryFailure(new CancellationException)
})
}
val (f, cancel) = interruptableFuture[Int] { () =>
val latch = new CountDownLatch(1)
latch.await(5, TimeUnit.SECONDS) // Blocks for 5 sec, is interruptable
println("latch timed out")
42 // Completed
}
f.onFailure { case ex => println(ex.getClass) }
f.onSuccess { case i => println(i) }
Thread.sleep(6000) // Set to less than 5000 to cancel
val wasCancelled = cancel()
println("wasCancelled: " + wasCancelled)
}
С Thread.sleep(6000)
вывод:
latch timed out
42
wasCancelled: false
С Thread.sleep(1000)
вывод:
wasCancelled: true
class java.util.concurrent.CancellationException
Ответ 3
Фьючерсы на фьючерсы на фьючерсы на фьючерс. Посмотрите здесь:
https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala
В строке 563 показан абстрактный метод, отвечающий за это. Scala фьючерсы в настоящее время не поддерживают отмену.