Блокировка звонков в Акку Актеры
Как новичок, я пытаюсь понять, как работают актеры. И, из документации, я думаю, я понимаю, что актеры - это объекты, которые выполняются в режиме синхронизации, а также то, что выполнение актера может содержать вызовы метода блокировки/синхронизации, например. db запросы
Но я не понимаю, что если вы напишете актера, у которого есть блокирующие вызовы внутри (например, выполнение блокирующего запроса), это испортит весь пул потоков (в том смысле, что использование процессора будет вниз и т.д.), правильно? Я имею в виду, что, по моему мнению, JVM не может понять, может ли он переключить этот поток на кого-то другого, если/когда актер делает блокирующий вызов.
Итак, учитывая природу concurrency, не должно ли быть очевидно, что актеры не должны делать никаких блокирующих вызовов, когда-либо?
Если это так, каков рекомендуемый способ выполнения неблокирующего/асинхронного вызова, скажем, вызов веб-службы, который извлекает что-то и отправляет сообщение другому игроку, когда этот запрос будет завершен? Если мы просто используем что-то вроде актера:
будущая карта {response = > x! response.body}
Это правильный способ справиться с этим?
Поблагодарили бы, если вы сможете это разъяснить мне.
Ответы
Ответ 1
Это действительно зависит от прецедента. Если запросы не нужно сериализовать, вы можете выполнить запрос в будущем и отправить результаты обратно отправителю следующим образом:
import scala.concurrent.{ future, blocking}
import akka.pattern.pipe
val resFut = future {
blocking {
executeQuery()
}
}
resFut pipeTo sender
Вы также можете создать специальный диспетчер исключительно для вызовов БД и использовать роутер для создания актера. Таким образом, вы также можете легко ограничить количество одновременных запросов БД.
Ответ 2
Действительно великое введение "Руководство Неофита к Scala Часть 14: Подход актера к Concurrency" http://danielwestheide.com/blog/2013/02/27/the-neophytes-guide-to-scala-part-14-the-actor-approach-to-concurrency.html.
Актер получает сообщение, завершает блокирование кода в будущем, в нем метод Future.onSuccess - отправляет результаты, используя другие асинхронные сообщения. Но будьте осторожны, что переменная отправителя может измениться, поэтому закройте ее (сделайте локальную ссылку в будущем объекте).
p.s.: Руководство Неофита к Scala - действительно отличная книга.
Обновлено: (добавлен пример кода)
У нас есть рабочий и менеджер. Менеджер устанавливает работу, которая должна быть выполнена, рабочие отчеты "получили" и запускают длительный процесс (сон 1000). Между тем системный пинг менеджер с сообщениями "живой" и менеджер пингует с ними работника. Когда работа выполнена - сотрудник уведомляет об этом менеджера.
Примечание: выполнение sleep 1000 выполняется в импортированном исполнителе пула потоков по умолчанию /global - вы можете получить головоломку потока.
NB: val commander = отправитель необходим, чтобы "закрыть" ссылку на оригинального отправителя, вызывать, когда onSuccess будет выполнен - текущий отправитель внутри актера может быть уже настроен на какой-либо другой "отправитель"...
Log:
01:35:12:632 Humming ...
01:35:12:633 manager: flush sent
01:35:12:633 worker: got command
01:35:12:633 manager alive
01:35:12:633 manager alive
01:35:12:633 manager alive
01:35:12:660 worker: started
01:35:12:662 worker: alive
01:35:12:662 manager: resource allocated
01:35:12:662 worker: alive
01:35:12:662 worker: alive
01:35:13:661 worker: done
01:35:13:663 manager: work is done
01:35:17:633 Shutdown!
код:
import akka.actor.{Props, ActorSystem, ActorRef, Actor}
import com.typesafe.config.ConfigFactory
import java.text.SimpleDateFormat
import java.util.Date
import scala.concurrent._
import ExecutionContext.Implicits.global
object Sample {
private val fmt = new SimpleDateFormat("HH:mm:ss:SSS")
def printWithTime(msg: String) = {
println(fmt.format(new Date()) + " " + msg)
}
class WorkerActor extends Actor {
protected def receive = {
case "now" =>
val commander = sender
printWithTime("worker: got command")
future {
printWithTime("worker: started")
Thread.sleep(1000)
printWithTime("worker: done")
}(ExecutionContext.Implicits.global) onSuccess {
// here commander = original sender who requested the start of the future
case _ => commander ! "done"
}
commander ! "working"
case "alive?" =>
printWithTime("worker: alive")
}
}
class ManagerActor(worker: ActorRef) extends Actor {
protected def receive = {
case "do" =>
worker ! "now"
printWithTime("manager: flush sent")
case "working" =>
printWithTime("manager: resource allocated")
case "done" =>
printWithTime("manager: work is done")
case "alive?" =>
printWithTime("manager alive")
worker ! "alive?"
}
}
def main(args: Array[String]) {
val config = ConfigFactory.parseString("" +
"akka.loglevel=DEBUG\n" +
"akka.debug.lifecycle=on\n" +
"akka.debug.receive=on\n" +
"akka.debug.event-stream=on\n" +
"akka.debug.unhandled=on\n" +
""
)
val system = ActorSystem("mine", config)
val actor1 = system.actorOf(Props[WorkerActor], "worker")
val actor2 = system.actorOf(Props(new ManagerActor(actor1)), "manager")
actor2 ! "do"
actor2 ! "alive?"
actor2 ! "alive?"
actor2 ! "alive?"
printWithTime("Humming ...")
Thread.sleep(5000)
printWithTime("Shutdown!")
system.shutdown()
}
}
Ответ 3
Вы правы, думая о пуле потоков, если вы планируете блокировать звонки в Akka. Чем больше вы блокируете, тем больше будет пул потоков. Полностью неблокирующая система действительно нуждается в пуле потоков, равном количеству ядер процессора вашего компьютера. Эталонная конфигурация использует пул в 3 раза больше ядер ЦП на машине, чтобы обеспечить некоторую блокировку:
# The core pool size factor is used to determine thread pool core size
# using the following formula: ceil(available processors * factor).
# Resulting size is then bounded by the core-pool-size-min and
# core-pool-size-max values.
core-pool-size-factor = 3.0
источник
Но вы можете увеличить akka.default-dispatcher.fork-join-executor.core-pool-size-factor
до более высокого номера, если вы делаете больше блокировки, или сделайте диспетчера, кроме по умолчанию, специально для блокировки вызовов с более высоким fork-join-executor.core-pool-size-factor
WRT, что лучший способ сделать блокировку вызовов в Akka. Я бы рекомендовал масштабировать, создав несколько экземпляров актеров, которые блокируют вызовы и помещают router в них, чтобы сделать их похожими на одного участника к остальной части вашего приложения.