Ответ 1
Если вы не против очень локализованного var
, вы можете сериализовать асинхронную обработку (каждый f(item)
) следующим образом (flatMap
выполняет сериализацию):
val fSerialized = {
var fAccum = Future{()}
for(item <- it) {
println(s"Processing ${item}")
fAccum = fAccum flatMap { _ => f(item) }
}
fAccum
}
fSerialized.onComplete{case resTry => println("All Done.")}
В общем, избегайте операций Await
- они блокируют (вид поражений точки асинхронизации, потребляет ресурсы и для неаккуратных конструкций, может быть заблокирован)
Прохладный трюк 1:
Вы можете объединить Futures
через этого обычного подозреваемого, flatMap
- он сериализует асинхронные операции. Есть ли что-то, что он не может сделать?; -)
def f1 = Future { // some background running logic here...}
def f2 = Future { // other background running logic here...}
val fSerialized: Future[Unit] = f1 flatMap(res1 => f2)
fSerialized.onComplete{case resTry => println("Both Done: Success=" + resTry.isSuccess)}
Ни один из вышеперечисленных блоков - основная нить проходит через несколько десятков наносекунд. Фьючерсы используются во всех случаях для выполнения параллельных потоков и отслеживания асинхронного состояния/результатов и цепной логики.
fSerialized
представляет собой совокупность двух различных асинхронных операций, соединенных вместе. Как только значение val оценивается, он немедленно запускает f1
(работает несинхронно). f1
работает как любой Future
- когда он заканчивается, он называет его onComplete
блоком обратного вызова. Здесь классный бит - flatMap
устанавливает этот аргумент как блок ответа f1
onComplete - поэтому f2
запускается, как только f1
завершается, без блокировки, опроса или расточительного использования ресурсов. Когда f2
завершено, то fSerialized
завершено - поэтому он запускает блок обратного вызова fSerialized.onComplete
- печать "Both Done".
Не только это, но вы можете объединять плоские карты столько, сколько хотите, с аккуратным кодом без спагетти
f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) ...
Если вы должны были сделать это через Future.onComplete, вам нужно было бы встроить последовательные операции как вложенные onComplete слои:
f1.onComplete{case res1Try =>
f2
f2.onComplete{case res2Try =>
f3
f3.onComplete{case res3Try =>
f4
f4.onComplete{ ...
}
}
}
}
Не так приятно.
Тест, чтобы доказать:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._
def f(item: Int): Future[Unit] = Future{
print("Waiting " + item + " seconds ...")
Console.flush
blocking{Thread.sleep((item seconds).toMillis)}
println("Done")
}
val fSerial = f(4) flatMap(res1 => f(16)) flatMap(res2 => f(2)) flatMap(res3 => f(8))
fSerial.onComplete{case resTry => println("!!!! That a wrap !!!! Success=" + resTry.isSuccess)}
Прохладный трюк 2:
для -понимания типа:
for {a <- aExpr; b <- bExpr; c <- cExpr; d <- dExpr} yield eExpr
- это не что иное, как синтаксический сахар для этого:
aExpr.flatMap{a => bExpr.flatMap{b => cExpr.flatMap{c => dExpr.map{d => eExpr} } } }
что цепочка flatMaps, за которой следует финальная карта.
Это означает, что
f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) map(res4 => "Did It!")
совпадает с
for {res1 <- f1; res2 <- f2; res3 <- f3; res4 <- f4} yield "Did It!"
Тест для проверки (следующий из предыдущего теста):
val fSerial = for {res1 <- f(4); res2 <- f(16); res3 <- f(2); res4 <- f(8)} yield "Did It!"
fSerial.onComplete{case resTry => println("!!!! That a wrap !!!! Success=" + resTry.isSuccess)}
Не очень-то трюк 3:
К сожалению, вы не можете смешивать итераторы и фьючерсы в одном и том же понимании. Ошибка компиляции:
val fSerial = {for {nextItem <- itemIterable; nextRes <- f(nextItem)} yield "Did It"}.last
И вложенность fors создает проблему. Следующие не сериализуют, но параллельно выполняют асинхронные блоки (вложенные методы не привязывают последующие фьючерсы с помощью flatMap/Map, а вместо этого цепочки как Iterable.flatMap {item = > f (item)} - не то же самое!)
val fSerial = {for {nextItem <- itemIterable} yield
for {nextRes <- f(nextItem)} yield "Did It"}.last
Также использование foldLeft/foldRight plus flatMap не работает так, как вы ожидали, - кажется ошибкой/ограничением; все асинхронные блоки обрабатываются параллельно (поэтому Iterator.foldLeft/Right
не работает с Future.flatMap
):
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._
def f(item: Int): Future[Unit] = Future{
print("Waiting " + item + " seconds ...")
Console.flush
blocking{Thread.sleep((item seconds).toMillis)}
println("Done")
}
val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
val empty = Future[Unit]{()}
def serialize(f1: Future[Unit], f2: Future[Unit]) = f1 flatMap(res1 => f2)
//val fSerialized = itemIterable.iterator.foldLeft(empty){(fAccum, item) => serialize(fAccum, f(item))}
val fSerialized = itemIterable.iterator.foldRight(empty){(item, fAccum) => serialize(fAccum, f(item))}
fSerialized.onComplete{case resTry => println("!!!! That a wrap !!!! Success=" + resTry.isSuccess)}
Но это работает (используется var):
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._
def f(item: Int): Future[Unit] = Future{
print("Waiting " + item + " seconds ...")
Console.flush
blocking{Thread.sleep((item seconds).toMillis)}
println("Done")
}
val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
var fSerial = Future{()}
for {nextItem <- itemIterable} fSerial = fSerial.flatMap(accumRes => f(nextItem))