Приостановка актера в Акке
У меня есть актер в Акке, который будет обрабатывать сообщения для создания определенных объектов. Некоторые поля этих объектов вычисляются на основе состояния других объектов в базе данных в момент создания.
Я бы хотел избежать создания условия гонки, когда обработка актера идет быстрее, чем база данных может сохранять объекты. Это может привести к несогласованным данным, следующим образом:
- Актер создает
Foo
и отправляет его другим участникам для дальнейшей обработки и сохранения
- Актеру предлагается создать еще один
Foo
. Поскольку первый еще не сохранен, новый создается на основе старого содержимого БД, тем самым создавая неправильный Foo
.
Теперь эта возможность довольно удалена, так как создание Foo
будет запускаться вручную. Но по-прежнему возможно, что двойной щелчок может вызвать проблемы при высокой нагрузке. И кто знает, будет ли завтра Foo
автоматически создано.
Следовательно, мне нужен какой-то способ заставить актера ждать и возобновить его работу только после подтверждения того, что Foo
сохранено.
Есть ли способ поставить актера в состояние ожидания и сказать ему возобновить его операции через некоторое время?
В принципе, я хотел бы использовать почтовый ящик в качестве очереди сообщений и контролировать скорость обработки очереди.
Ответы
Ответ 1
Нет, вы не можете приостановить действие актера: актеры всегда вытаскивают сообщения из своего почтового ящика как можно быстрее. Это оставляет только возможность того, что входящие запросы будут спрятаны, для последующей обработки:
class A(db: ActorRef) extends Actor with Stash {
def receive = {
case Request =>
doWork()
db ! Persist
context.setReceiveTimeout(5.seconds)
context.become({
case Request => stash()
case Persisted => context.unbecome(); unstashAll()
case ReceiveTimeout => throw new TimeoutException("not persisted")
}, discardOld = false)
}
}
Обратите внимание, что доставка сообщений не гарантируется (или база данных может быть недоступна), и поэтому рекомендуется время ожидания.
Основная проблема
Эта проблема проявляется главным образом в тех случаях, которые недостаточно хорошо выровнены между моделью актера и моделью домена: актер является единицей согласованности, но в вашем случае использования ваше согласованное изображение требует наличия актуального внешнего сущности (базы данных), чтобы актер поступал правильно. Я не могу рекомендовать решение, не зная больше о прецеденте, но попытайтесь переделать свою проблему с учетом этого.
Ответ 2
Оказывается, для этого требуется всего несколько строк. Это решение, с которым я столкнулся, что согласуется с предложением pagoda_5b:
class QueueingActor(nextActor: ActorRef) extends Actor with Stash {
import QueueingActor._
def receive = {
case message =>
context.become({
case Resume =>
unstashAll()
context.unbecome()
case _ => stash()
})
nextActor ! message
}
}
object QueueingActor {
case class Resume()
}