Как подождать несколько фьючерсов
Предположим, что у меня есть несколько фьючерсов и вам нужно подождать, пока ни один из них не завершится неудачей, или все из них не удастся.
Например: пусть есть 3 фьючерса: f1
, f2
, f3
.
-
Если f1
преуспевает, а f2
терпит неудачу, я не могу ждать f3
(и возвращать сбой клиенту).
-
Если f2
не работает, пока f1
и f3
все еще работают, я не жду их (и возвращаю сбой)
-
Если f1
завершается успешно, а затем f2
выполняется, я продолжаю ждать f3
.
Как вы его реализуете?
Ответы
Ответ 1
Вместо этого вы можете использовать для понимания следующее:
val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}
val aggFut = for{
f1Result <- fut1
f2Result <- fut2
f3Result <- fut3
} yield (f1Result, f2Result, f3Result)
В этом примере фьючерсы 1, 2 и 3 запускаются параллельно. Затем, в понимании, мы ждем, пока не будут получены результаты 1, а затем 2, а затем 3. Если 1 или 2 терпят неудачу, мы больше не будем ждать 3. Если все 3 преуспевают, то aggFut
val будет содержать кортеж с 3 слотами, соответствующий результатам трех фьючерсов.
Теперь, если вам нужно поведение, в котором вы хотите прекратить ждать, если сначала скажите fut2, все становится немного сложнее. В приведенном выше примере вам нужно будет дождаться завершения работы fut1 до того, как произойдет сбой fut2. Чтобы решить эту проблему, вы можете попробовать что-то вроде этого:
val fut1 = Future{Thread.sleep(3000);1}
val fut2 = Promise.failed(new RuntimeException("boo")).future
val fut3 = Future{Thread.sleep(1000);3}
def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
val fut = if (futures.size == 1) futures.head._2
else Future.firstCompletedOf(futures.values)
fut onComplete{
case Success(value) if (futures.size == 1)=>
prom.success(value :: values)
case Success(value) =>
processFutures(futures - value, value :: values, prom)
case Failure(ex) => prom.failure(ex)
}
prom.future
}
val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
aggFut onComplete{
case value => println(value)
}
Теперь это работает правильно, но проблема исходит из знания того, какой Future
удалить из Map
, когда он был успешно завершен. До тех пор, пока у вас есть способ правильно соотнести результат с будущим, который породил этот результат, тогда что-то подобное работает. Он просто рекурсивно сохраняет удаление завершенных фьючерсов с карты и затем вызывает Future.firstCompletedOf
на оставшихся Futures
, пока их не осталось, собирая результаты на этом пути. Это некрасиво, но если вам действительно нужно поведение, о котором вы говорите, то это или что-то подобное может сработать.
Ответ 2
Вы можете использовать обещание и отправить ему либо первый сбой, либо завершенный завершенный агрегированный успех:
def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
val p = Promise[M[A]]()
// the first Future to fail completes the promise
in.foreach(_.onFailure{case i => p.tryFailure(i)})
// if the whole sequence succeeds (i.e. no failures)
// then the promise is completed with the aggregated success
Future.sequence(in).foreach(p trySuccess _)
p.future
}
Тогда вы можете Await
создать Future
, если хотите заблокировать, или просто map
в другое.
Разница с пониманием заключается в том, что здесь вы получаете ошибку первого сбой, тогда как для понимания вы получаете первую ошибку в порядке обхода входной коллекции (даже если первая из них была неудачной). Например:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }
Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)}
// this waits one second, then prints "java.lang.ArithmeticException: / by zero"
// the first to fail in traversal order
и
val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }
sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)}
// this immediately prints "java.util.NoSuchElementException: None.get"
// the 'actual' first to fail (usually...)
// and it returns early (it does not wait 1 sec)
Ответ 3
Вот решение без использования актеров.
import scala.util._
import scala.concurrent._
import java.util.concurrent.atomic.AtomicInteger
// Nondeterministic.
// If any failure, return it immediately, else return the final success.
def allSucceed[T](fs: Future[T]*): Future[T] = {
val remaining = new AtomicInteger(fs.length)
val p = promise[T]
fs foreach {
_ onComplete {
case s @ Success(_) => {
if (remaining.decrementAndGet() == 0) {
// Arbitrarily return the final success
p tryComplete s
}
}
case f @ Failure(_) => {
p tryComplete f
}
}
}
p.future
}
Ответ 4
Вы можете сделать это только с помощью фьючерсов. Здесь одна реализация. Обратите внимание, что это не прекратит выполнение раньше! В этом случае вам нужно сделать что-то более сложное (и, возможно, реализовать перерыв самостоятельно). Но если вы просто не хотите продолжать ждать чего-то, что не сработает, ключ должен продолжать ждать, когда первое закончится, и остановитесь, когда ничего не осталось или вы ударите исключение:
import scala.annotation.tailrec
import scala.util.{Try, Success, Failure}
import scala.concurrent._
import scala.concurrent.duration.Duration
import ExecutionContext.Implicits.global
@tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()):
Either[Throwable, Seq[A]] = {
val first = Future.firstCompletedOf(fs)
Await.ready(first, Duration.Inf).value match {
case None => awaitSuccess(fs, done) // Shouldn't happen!
case Some(Failure(e)) => Left(e)
case Some(Success(_)) =>
val (complete, running) = fs.partition(_.isCompleted)
val answers = complete.flatMap(_.value)
answers.find(_.isFailure) match {
case Some(Failure(e)) => Left(e)
case _ =>
if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done)
else Right( answers.map(_.get) ++: done )
}
}
}
Вот пример этого в действии, когда все работает нормально:
scala> awaitSuccess(Seq(Future{ println("Hi!") },
Future{ Thread.sleep(1000); println("Fancy meeting you here!") },
Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
Fancy meeting you here!
Bye!
res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))
Но когда что-то пойдет не так:
scala> awaitSuccess(Seq(Future{ println("Hi!") },
Future{ Thread.sleep(1000); throw new Exception("boo"); () },
Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo)
scala> Bye!
Ответ 5
Для этого я бы воспользовался аккой Аккой. В отличие от понимания, он терпит неудачу, как только любой из фьючерсов терпит неудачу, поэтому в этом смысле он немного эффективнее.
class ResultCombiner(futs: Future[_]*) extends Actor {
var origSender: ActorRef = null
var futsRemaining: Set[Future[_]] = futs.toSet
override def receive = {
case () =>
origSender = sender
for(f <- futs)
f.onComplete(result => self ! if(result.isSuccess) f else false)
case false =>
origSender ! SomethingFailed
case f: Future[_] =>
futsRemaining -= f
if(futsRemaining.isEmpty) origSender ! EverythingSucceeded
}
}
sealed trait Result
case object SomethingFailed extends Result
case object EverythingSucceeded extends Result
Затем создайте актера, отправьте ему сообщение (чтобы он знал, куда отправить свой ответ) и дождитесь ответа.
val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3)))
try {
val f4: Future[Result] = actor ? ()
implicit val timeout = new Timeout(30 seconds) // or whatever
Await.result(f4, timeout.duration).asInstanceOf[Result] match {
case SomethingFailed => println("Oh noes!")
case EverythingSucceeded => println("It all worked!")
}
} finally {
// Avoid memory leaks: destroy the actor
actor ! PoisonPill
}
Ответ 6
На этот вопрос был дан ответ, но я отправляю свое решение класса ценности (классы значений были добавлены в 2.10), так как их здесь нет. Пожалуйста, не стесняйтесь критиковать.
implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal {
def concurrently = ConcurrentFuture(self)
}
case class ConcurrentFuture[A](future: Future[A]) extends AnyVal {
def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future))
def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class
}
def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = {
val p = Promise[B]()
val inner = f(outer.future)
inner.future onFailure { case t => p.tryFailure(t) }
outer.future onFailure { case t => p.tryFailure(t) }
inner.future onSuccess { case b => p.trySuccess(b) }
ConcurrentFuture(p.future)
}
ConcurrentFuture - это не оболочка будущего будущего, которая изменяет стандартную карту будущего /mapMap по умолчанию от do-this-then-that-to-to-all-and-fail-if-any-fail. Использование:
def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 }
def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" }
def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 }
val f : Future[(Int,String,Double)] = {
for {
f1 <- func1.concurrently
f2 <- func2.concurrently
f3 <- func3.concurrently
} yield for {
v1 <- f1
v2 <- f2
v3 <- f3
} yield (v1,v2,v3)
}.future
f.onFailure { case t => println("future failed $t") }
В приведенном выше примере f1, f2 и f3 будут выполняться одновременно, и если какой-либо сбой в любом порядке, будущее кортежа немедленно сработает.
Ответ 7
Вы можете использовать это:
val l = List(1, 6, 8)
val f = l.map{
i => future {
println("future " +i)
Thread.sleep(i* 1000)
if (i == 12)
throw new Exception("6 is not legal.")
i
}
}
val f1 = Future.sequence(f)
f1 onSuccess{
case l => {
logInfo("onSuccess")
l.foreach(i => {
logInfo("h : " + i)
})
}
}
f1 onFailure{
case l => {
logInfo("onFailure")
}
Ответ 8
Возможно, вы захотите проверить API будущего Twitter. Примечательно, что метод Future.collect. Он делает именно то, что вы хотите: https://twitter.github.io/scala_school/finagle.html
Исходный код Future.scala доступен здесь:
https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala