Акка: правильное использование шаблона `ask`?
Я пытаюсь найти Futures
и попросить шаблон в akka.
Итак, я делаю двух актеров, и один просит другого отправить ему сообщение. Ну, согласно документации akka Futures
, актер должен запросить (?
) сообщение, и он предоставит ему Future
instanse. Затем актер должен блокировать (используя Await
), чтобы получить результаты Future
.
Хорошо, я никогда не буду делать свое будущее. Почему это?
Код:
package head_thrash
import akka.actor._
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
object Main extends App {
val system = ActorSystem("actors")
val actor1 = system.actorOf(Props[MyActor], "node_1")
val actor2 = system.actorOf(Props[MyActor], "node_2")
actor2 ! "ping_other"
system.awaitTermination()
Console.println("Bye!")
}
class MyActor extends Actor with ActorLogging {
import akka.pattern.ask
implicit val timeout = Timeout(100.days)
def receive = {
case "ping_other" => {
val selection = context.actorSelection("../node_1")
log.info("Sending ping to node_1")
val result = Await.result(selection ? "ping", Duration.Inf) // <-- Blocks here forever!
log.info("Got result " + result)
}
case "ping" => {
log.info("Sending back pong!")
sender ! "pong"
}
}
}
Если я изменяю Duration.Inf
на 5.seconds
, то актер ждет 5 секунд, говорит, что мое будущее - Timeouted (путем бросания TimeoutException
), а затем другой актер, наконец, отвечает необходимым сообщением. Таким образом, никакой асинк не происходит. Зачем?:-(
Как правильно реализовать этот шаблон? Спасибо.
Ответы
Ответ 1
Официальная документация Akka говорит, что Await.result заставит текущий поток блокировать и ждать, пока Актер "завершит" Будущее с его ответом.
Странно, что ваш код блокируется навсегда, у вас есть только один поток для всего вашего приложения?
В любом случае, я думаю, что более "идиоматический" способ кодирования будет заключаться в использовании обратного вызова для будущего успеха.
def receive = {
case "ping_other" => {
val selection = context.actorSelection("../node_1")
log.info("Sending ping to node_1")
val future: Future[String] = ask(selection, "ping").mapTo[String]
future.onSuccess {
case result : String ⇒ log.info("Got result " + result)
}
}
...
Ответ 2
Две причины, почему это не работает.
Во-первых, "node_1" спрашивает себя, и "ping" не будет обработан, потому что он блокируется в ожидании запроса.
Кроме того, существует недостаток actorSelection для относительных путей ( "../node_1" ). Он обрабатывается с передачей сообщений, и поскольку ваш актер блокируется, он не может обработать какое-либо другое сообщение. Это было улучшено в предстоящей версии 2.3 Akka, но вам все равно следует избегать блокировки.