Ожидание асинхронного вызова в будущем перед обработкой следующего сообщения в Akka
При получении событий Акка Актеры будут обрабатывать одно сообщение за раз, блокируя, пока запрос не будет завершен, прежде чем переходить к следующему сообщению.
Это хорошо работает для синхронных/блокирующих задач, однако, если я хочу выполнить асинхронный/неблокирующий запрос, Akka продолжит обработку, не дожидаясь завершения задачи.
Например:
def doThing():Future[Unit] = /* Non blocking request here */
def receive = {
case DoThing => doThing() pipeTo sender
}
Это вызовет doThing() и начнет обрабатывать будущее, но не дожидается его завершения до обработки следующего сообщения - он просто выполнит следующие сообщения в очереди как можно быстрее.
По сути, похоже, что Akka считает "возвращение будущего" "завершенной обработкой" и переходит к следующему сообщению.
Чтобы обрабатывать одно сообщение за раз, кажется, мне нужно активно блокировать поток Actor, чтобы он этого не делал
def receive = {
case DoThing => sender ! blocking(Await.result(doThing()))
}
Это похоже на очень неправильный подход - он искусственно блокирует поток в коде, который в противном случае полностью не блокирует.
Сравнивая Akka с, скажем, с актерами Elixir, мы можем легко избежать этой проблемы в первую очередь, используя хвостовой вызов для запроса следующего сообщения без искусственного блокирования.
Есть ли какой-либо способ в Akka для
a) Дождитесь завершения Future
перед обработкой следующего сообщения без блокировки потока.
b) Использовать явный хвостовой вызов или какой-либо другой механизм для использования потокового рабочего процесса вместо push на основе?
Ответы
Ответ 1
Как было предложено в комментариях, вы можете использовать свойство Stash
(http://doc.akka.io/docs/akka/current/scala/actors.html#Stash) для хранения входящих сообщений по мере того, как вы ждете для Future
.
Требуется сохранить текущего отправителя, чтобы вы неправильно закрыли ссылку на ссылку отправителя. Вы можете достичь этого с помощью простого класса case, такого как ниже, указанного ниже.
class MyActor extends Actor with Stash {
import context.dispatcher
// Save the correct sender ref by using something like
// case class WrappedFuture(senderRef: ActorRef, result: Any)
def doThing(): Future[WrappedFuture] = ???
override def receive: Receive = {
case msg: DoThing =>
doThing() pipeTo self
context.become({
case WrappedFuture(senderRef, result) =>
senderRef ! result
unstashAll()
context.unbecome()
case newMsg: DoThing =>
stash()
}, discardOld = false)
}
}
Ответ 2
Вместо того, чтобы иметь одного участника, занимающегося этой проблемой, иметь цепочку из двух:
- Актер 1 получает начальные сообщения, запускает все вызовы ввода-вывода и объединяет их в будущем
- Актер 2 получает результаты объединенного фьючерса.
Это не гарантирует сохранение порядка сообщений, поэтому, если вам это нужно, тогда Actor 2 должен быть осведомлен о сообщениях, которые видел актер 1, и, возможно, сам запустил ранние сообщения.
Я ничего не знаю о Акке, которая решает эту проблему. Может быть, есть библиотека, которая реализует такой шаблон?