Scala ожидание последовательности фьючерсов
Я надеялся, что код вроде следующего будет ждать обоих фьючерсов, но это не так.
object Fiddle {
val f1 = Future {
throw new Throwable("baaa") // emulating a future that bumped into an exception
}
val f2 = Future {
Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
2
}
val lf = List(f1, f2) // in the general case, this would be a dynamically sized list
val seq = Future.sequence(lf)
seq.onComplete {
_ => lf.foreach(f => println(f.isCompleted))
}
}
val a = FuturesSequence
Я предположил, что seq.onComplete
будет ждать их завершения до завершения, но не так; это приводит к:
true
false
.sequence
было немного сложно найти в источнике scala.concurrent.Future, мне интересно, как реализовать параллель, которая ждет всех исходных фьючерсов (динамически размерной) последовательности или что может быть проблема здесь.
Изменить: Связанный с этим вопрос: https://worldbuilding.stackexchange.com/info/12348/how-do-you-prove-youre-from-the-future:)
Ответы
Ответ 1
Один общий подход к ожиданию всех результатов (проваленных или нет) состоит в том, чтобы "поднять" сбои в новое представление в будущем, чтобы все фьючерсы завершились с некоторым результатом (хотя они могут завершиться с результатом, который представляет сбой). Один из естественных способов получить это - Try
.
Твиттер-реализация futures предоставляет метод liftToTry
который делает это тривиальным, но вы можете сделать нечто подобное со стандартной реализацией библиотеки:
import scala.util.{ Failure, Success, Try }
val lifted: List[Future[Try[Int]]] = List(f1, f2).map(
_.map(Success(_)).recover { case t => Failure(t) }
)
Теперь Future.sequence(lifted)
будет завершено после завершения каждого будущего и будет представлять успехи и неудачи с помощью Try
.
Итак, общее решение для ожидания на всех исходных фьючерсах последовательности фьючерсов может выглядеть следующим образом, предполагая, что контекст выполнения, конечно, неявно доступен.
import scala.util.{ Failure, Success, Try }
private def lift[T](futures: Seq[Future[T]]) =
futures.map(_.map { Success(_) }.recover { case t => Failure(t) })
def waitAll[T](futures: Seq[Future[T]]) =
Future.sequence(lift(futures)) // having neutralized exception completions through the lifting, .sequence can now be used
waitAll(SeqOfFutures).map {
// do whatever with the completed futures
}
Ответ 2
A Future
, созданный Future.sequence
, завершается, когда:
- все фьючерсы успешно завершены или
- одно из фьючерсов потерпело неудачу
Второй момент - это то, что происходит в вашем случае, и имеет смысл завершить, как только один из завернутых Future
завершился неудачно, потому что wrapping Future
может содержать только один Throwable
в случае сбоя, Нет смысла ждать других фьючерсов, потому что результат будет тем же самым сбоем.
Ответ 3
Это пример, который поддерживает предыдущий ответ. Существует простой способ сделать это, используя только стандартные API Scala.
В этом примере я создаю 3 фьючерса. Они пройдут через 5, 7 и 9 секунд соответственно. Вызов Await.result
будет заблокирован до тех пор, пока все фьючерсы не будут решены. Как только все 3 фьючерса завершены, a
будет установлено на List(5,7,9)
, и выполнение продолжится.
Кроме того, если в любом из фьючерсов выбрано исключение, Await.result
будет немедленно разблокировать и выбросить исключение. Раскомментируйте строку Exception(...)
, чтобы увидеть это в действии.
try {
val a = Await.result(Future.sequence(Seq(
Future({
blocking {
Thread.sleep(5000)
}
System.err.println("A")
5
}),
Future({
blocking {
Thread.sleep(7000)
}
System.err.println("B")
7
//throw new Exception("Ha!")
}),
Future({
blocking {
Thread.sleep(9000)
}
System.err.println("C")
9
}))),
Duration("100 sec"))
System.err.println(a)
} catch {
case e: Exception ⇒
e.printStackTrace()
}
Ответ 4
Мы можем обогатить Seq[Future[T]]
своим собственным методом onComplete
через неявный класс:
def lift[T](f: Future[T])(implicit ec: ExecutionContext): Future[Try[T]] =
f map { Success(_) } recover { case e => Failure(e) }
def lift[T](fs: Seq[Future[T]])(implicit ec: ExecutionContext): Seq[Future[Try[T]]] =
fs map { lift(_) }
implicit class RichSeqFuture[+T](val fs: Seq[Future[T]]) extends AnyVal {
def onComplete[U](f: Seq[Try[T]] => U)(implicit ec: ExecutionContext) = {
Future.sequence(lift(fs)) onComplete {
case Success(s) => f(s)
case Failure(e) => throw e // will never happen, because of the Try lifting
}
}
}
Затем в вашем конкретном MWE вы можете сделать:
val f1 = Future {
throw new Throwable("baaa") // emulating a future that bumped into an exception
}
val f2 = Future {
Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
2
}
val lf = List(f1, f2)
lf onComplete { _ map {
case Success(v) => ???
case Failure(e) => ???
}}
Это решение имеет то преимущество, что вы можете вызвать onComplete
в последовательности фьючерсов, как это было бы в одно будущее.