Как справиться с длительной инициализацией актера Акка?
У меня есть актер, который создает дочерний актер для выполнения длинных вычислений.
Проблема в том, что инициализация дочернего актера занимает несколько секунд, и все сообщения, которые родительский актер отправляет ребенку между ним, создаются и полностью инициализируются, отбрасываются.
Это логика кода, который я использую:
class ChildActor extends Actor {
val tagger = IntializeTagger(...) // this takes a few seconds to complete
def receive = {
case Tag(text) => sender ! tagger.tag(text)
case "hello" => println("Hello")
case _ => println("Unknown message")
}
}
class ParentActor extends Actor {
val child = context.ActorOf(Props[ChildActor], name = "childactor")
// the below two messages seem to get lost
child ! "hello"
child ! Tag("This is my sample text")
def receive = {
...
}
}
Как я могу обойти эту проблему? Можно ли заставить родительского актера ждать, пока ребенок полностью инициализируется? Я буду использовать дочернего актера с маршрутизацией и, возможно, на удаленных актерских системах.
ИЗМЕНИТЬ
Следуя советам drexin, я изменил свой код на:
class ChildActor extends Actor {
var tagger: Tagger = _
override def preStart() = {
tagger = IntializeTagger(...) // this takes a few seconds to complete
}
def receive = {
case Tag(text) => sender ! tagger.tag(text)
case "hello" => println("Hello")
case _ => println("Unknown message")
}
}
class ParentActor extends Actor {
var child: ActorRef = _
override def preStart() = {
child = context.ActorOf(Props[ChildActor], name = "childactor")
// When I add
// Thread.sleep(5000)
// here messages are processed without problems
// wihout hardcoding the 5 seconds waiting
// the below two messages seem to get lost
child ! "hello"
child ! Tag("This is my sample text")
}
def receive = {
...
}
}
но проблема остается. Что мне не хватает?
Ответы
Ответ 1
Не инициализируйте tagger
в конструкторе, но в hook preStart
, таким образом, сообщения будут собраны в поле сообщения и доставлены, когда актер будет готов.
изменить
Вы должны сделать то же самое для создания актера в своем классе ParentActor
, потому что у вас будет такая же проблема, если ответ будет ChildActor
, прежде чем инициализируется ParentActor
.
edit2
Я создал простой пример, но я не смог воспроизвести ваши проблемы. Следующий код работает отлично:
import akka.actor._
case class Tag(x: String)
class ChildActor extends Actor {
type Tagger = String => String
var tagger: Tagger = _
override def preStart() = {
tagger = (x: String) => x+"@tagged" // this takes a few seconds to complete
Thread.sleep(2000) // simulate time taken to initialize Tagger
}
def receive = {
case Tag(text) => sender ! tagger(text)
case "hello" => println("Hello")
case _ => println("Unknown message")
}
}
class ParentActor extends Actor {
var child: ActorRef = _
override def preStart() = {
child = context.actorOf(Props[ChildActor], name = "childactor")
// When I add
// Thread.sleep(5000)
// here messages are processed without problems
// wihout hardcoding the 5 seconds waiting
// the below two messages seem to get lost
child ! "hello"
child ! Tag("This is my sample text")
}
def receive = {
case x => println(x)
}
}
object Main extends App {
val system = ActorSystem("MySystem")
system.actorOf(Props[ParentActor])
}
Выход:
[info] Running Main
Hello
This is my sample [email protected]
Ответ 2
Я думаю, что вы можете искать комбо Stash
и become
. Идея будет заключаться в том, что дочерний актер установит исходное состояние в неинициализированное, и в то время как в этом состоянии он будет закрывать все входящие сообщения до тех пор, пока он не будет полностью инициализирован. Когда он полностью инициализирован, вы можете разблокировать все сообщения перед тем, как переключить поведение в состояние инициализации. Простой пример:
class ChildActor2 extends Actor with Stash{
import context._
var dep:SlowDependency = _
override def preStart = {
val me = context.self
Future{
dep = new SlowDependency
me ! "done"
}
}
def uninitialized:Receive = {
case "done" =>
unstashAll
become(initialized)
case other => stash()
}
def initialized:Receive = {
case "a" => println("received the 'a' message")
case "b" => println("received the 'b' message")
}
def receive = uninitialized
}
Обратите внимание на preStart
, что я выполняю свою инициализацию асинхронно, чтобы не останавливать запуск актера. Теперь это немного уродливо, с закрытием измененного dep
var. Вы могли бы с уверенностью справиться с этим, отправив сообщение другому актеру, который обрабатывает экземпляр медленной зависимости и отправляет его этому актеру. После получения зависимости он вызывается become
для состояния initialized
.
Теперь есть одна оговорка с Stash
, и я вставляю ее прямо из документов Akka:
Please note that the Stash can only be used together with actors that
have a deque-based mailbox. For this, configure the mailbox-type of the
dispatcher to be a deque-based mailbox, such as
akka.dispatch.UnboundedDequeBasedMailbox (see Dispatchers (Scala)).
Теперь, если это не подходит вам, вы можете попробовать использовать метод типа DI
и позволить медленной зависимости вводиться в дочерний актер через его конструктор. Таким образом, вы определяете дочернего актера следующим образом:
class ChildActor(dep:SlowDependency) extends Actor{
...
}
Затем, при запуске этого актера, вы сделали бы это следующим образом:
context.actorOf(new Props().withCreator(new ChildActor(slowDep)), name = "child-actor")
Ответ 3
Я бы предложил отправить "готовое" сообщение от дочернего актера родителю и начать отправлять сообщения дочернему игроку только после того, как это сообщение будет получено. Вы можете сделать это только в методе receive()
для простых случаев использования или вы можете использовать become
или FSM
, чтобы изменить поведение родительского актера после того, как ребенок будет инициализирован (например, сохраните сообщения для ребенка в некотором промежуточном хранилище и отправить их всем, когда он будет готов).