Scala ожидание последовательности фьючерсов

Я надеялся, что код вроде следующего будет ждать обоих фьючерсов, но это не так.

object Fiddle {
  val f1 = Future {
    throw new Throwable("baaa") // emulating a future that bumped into an exception
  }

  val f2 = Future {
    Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
    2
  }

  val lf = List(f1, f2) // in the general case, this would be a dynamically sized list

  val seq = Future.sequence(lf) 

  seq.onComplete {
    _ => lf.foreach(f => println(f.isCompleted))
  }
}

val a = FuturesSequence

Я предположил, что seq.onComplete будет ждать их завершения до завершения, но не так; это приводит к:

true
false

.sequence было немного сложно найти в источнике scala.concurrent.Future, мне интересно, как реализовать параллель, которая ждет всех исходных фьючерсов (динамически размерной) последовательности или что может быть проблема здесь.

Изменить: Связанный с этим вопрос: https://worldbuilding.stackexchange.com/info/12348/how-do-you-prove-youre-from-the-future:)

Ответы

Ответ 1

Один общий подход к ожиданию всех результатов (проваленных или нет) состоит в том, чтобы "поднять" сбои в новое представление в будущем, чтобы все фьючерсы завершились с некоторым результатом (хотя они могут завершиться с результатом, который представляет сбой). Один из естественных способов получить это - Try.

Твиттер-реализация futures предоставляет метод liftToTry который делает это тривиальным, но вы можете сделать нечто подобное со стандартной реализацией библиотеки:

import scala.util.{ Failure, Success, Try }

val lifted: List[Future[Try[Int]]] = List(f1, f2).map(
  _.map(Success(_)).recover { case t => Failure(t) }
)

Теперь Future.sequence(lifted) будет завершено после завершения каждого будущего и будет представлять успехи и неудачи с помощью Try.

Итак, общее решение для ожидания на всех исходных фьючерсах последовательности фьючерсов может выглядеть следующим образом, предполагая, что контекст выполнения, конечно, неявно доступен.

  import scala.util.{ Failure, Success, Try }

  private def lift[T](futures: Seq[Future[T]]) = 
    futures.map(_.map { Success(_) }.recover { case t => Failure(t) })

  def waitAll[T](futures: Seq[Future[T]]) =
    Future.sequence(lift(futures)) // having neutralized exception completions through the lifting, .sequence can now be used

  waitAll(SeqOfFutures).map { 
    // do whatever with the completed futures
  }

Ответ 2

A Future, созданный Future.sequence, завершается, когда:

  • все фьючерсы успешно завершены или
  • одно из фьючерсов потерпело неудачу

Второй момент - это то, что происходит в вашем случае, и имеет смысл завершить, как только один из завернутых Future завершился неудачно, потому что wrapping Future может содержать только один Throwable в случае сбоя, Нет смысла ждать других фьючерсов, потому что результат будет тем же самым сбоем.

Ответ 3

Это пример, который поддерживает предыдущий ответ. Существует простой способ сделать это, используя только стандартные API Scala.

В этом примере я создаю 3 фьючерса. Они пройдут через 5, 7 и 9 секунд соответственно. Вызов Await.result будет заблокирован до тех пор, пока все фьючерсы не будут решены. Как только все 3 фьючерса завершены, a будет установлено на List(5,7,9), и выполнение продолжится.

Кроме того, если в любом из фьючерсов выбрано исключение, Await.result будет немедленно разблокировать и выбросить исключение. Раскомментируйте строку Exception(...), чтобы увидеть это в действии.

  try {
    val a = Await.result(Future.sequence(Seq(
      Future({
        blocking {
          Thread.sleep(5000)
        }
        System.err.println("A")
        5
      }),
      Future({
        blocking {
          Thread.sleep(7000)
        }
        System.err.println("B")
        7
        //throw new Exception("Ha!")
      }),
      Future({
        blocking {
          Thread.sleep(9000)
        }
        System.err.println("C")
        9
      }))),
      Duration("100 sec"))

    System.err.println(a)
  } catch {
    case e: Exception ⇒
      e.printStackTrace()
  }

Ответ 4

Мы можем обогатить Seq[Future[T]] своим собственным методом onComplete через неявный класс:

  def lift[T](f: Future[T])(implicit ec: ExecutionContext): Future[Try[T]] =
    f map { Success(_) } recover { case e => Failure(e) }

  def lift[T](fs: Seq[Future[T]])(implicit ec: ExecutionContext): Seq[Future[Try[T]]] =
    fs map { lift(_) }

  implicit class RichSeqFuture[+T](val fs: Seq[Future[T]]) extends AnyVal {
    def onComplete[U](f: Seq[Try[T]] => U)(implicit ec: ExecutionContext) = {
      Future.sequence(lift(fs)) onComplete {
        case Success(s) => f(s)
        case Failure(e) => throw e // will never happen, because of the Try lifting
      }
    }
  }

Затем в вашем конкретном MWE вы можете сделать:

  val f1 = Future {
    throw new Throwable("baaa") // emulating a future that bumped into an exception
  }

  val f2 = Future {
    Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
    2
  }

  val lf = List(f1, f2)

  lf onComplete { _ map {
    case Success(v) => ???
    case Failure(e) => ???
  }}

Это решение имеет то преимущество, что вы можете вызвать onComplete в последовательности фьючерсов, как это было бы в одно будущее.