Спящие актеры?
Какой лучший способ спящего актера? У меня есть актеры, созданные как агенты, которые хотят поддерживать разные части базы данных (включая получение данных из внешних источников). По ряду причин (в том числе не перегружая базу данных или сообщений и общую нагрузку), я хочу, чтобы участники спали между каждой операцией. Я смотрю на что-то вроде 10 объектов-актеров.
Актеры будут работать довольно много бесконечно, так как всегда будут появляться новые данные или сидеть в таблице, ожидающей распространения в других частях базы данных и т.д. Идея заключается в том, чтобы база данных была как можно более полной в любой момент времени.
Я мог бы сделать это с помощью бесконечного цикла и спать в конце каждого цикла, но в соответствии с http://www.scala-lang.org/node/242 участники используют пул потоков, который расширяется всякий раз, когда блокируются все потоки. Поэтому я думаю, что Thread.sleep в каждом актере будет плохой идеей, так как ненужные ненужные потоки.
Возможно, у меня мог бы быть центральный актер со своей собственной петлей, которая отправляет сообщения подписчикам на часы (например, наблюдатели часов асинхронных событий)?
Кто-нибудь делал что-либо подобное или имел какие-то предложения? Извините за дополнительную (возможно, лишнюю) информацию.
Приветствия
Джо
Ответы
Ответ 1
Нет необходимости явно приводить актера в спящий режим: использование loop
и react
для каждого актера означает, что в базовом пуле потоков будут ожидающие потоки, пока нет сообщений для участников.
В том случае, если вы хотите запланировать события для ваших участников, это довольно просто, используя однопоточный планировщик из служебных программ java.util.concurrent
:
object Scheduler {
import java.util.concurrent.Executors
import scala.compat.Platform
import java.util.concurrent.TimeUnit
private lazy val sched = Executors.newSingleThreadScheduledExecutor();
def schedule(f: => Unit, time: Long) {
sched.schedule(new Runnable {
def run = f
}, time , TimeUnit.MILLISECONDS);
}
}
Вы можете расширить это, чтобы выполнять периодические задачи, и он может использоваться таким образом:
val execTime = //...
Scheduler.schedule( { Actor.actor { target ! message }; () }, execTime)
Затем вашему целевому актеру просто нужно реализовать соответствующий цикл react
для обработки данного сообщения. Вам не нужно, чтобы у вас был спящий актер.
Ответ 2
В первом ответе был хороший момент для Эрланг, но он, похоже, исчез. Вы можете сделать тот же Erlang-подобный трюк с Scala актерами легко. Например. создайте планировщик, который не использует потоки:
import actors.{Actor,TIMEOUT}
def scheduler(time: Long)(f: => Unit) = {
def fixedRateLoop {
Actor.reactWithin(time) {
case TIMEOUT => f; fixedRateLoop
case 'stop =>
}
}
Actor.actor(fixedRateLoop)
}
И пусть протестировать его (я сделал это правильно в Scala REPL) с помощью тестового клиента-клиента:
case class Ping(t: Long)
import Actor._
val test = actor { loop {
receiveWithin(3000) {
case Ping(t) => println(t/1000)
case TIMEOUT => println("TIMEOUT")
case 'stop => exit
}
} }
Запустите планировщик:
import compat.Platform.currentTime
val sched = scheduler(2000) { test ! Ping(currentTime) }
и вы увидите что-то вроде этого
scala> 1249383399
1249383401
1249383403
1249383405
1249383407
что означает, что наш планировщик отправляет сообщение каждые 2 секунды, как ожидалось. Остановите планировщик:
sched ! 'stop
тестовый клиент начнет сообщать о тайм-аутах:
scala> TIMEOUT
TIMEOUT
TIMEOUT
остановите его:
test ! 'stop
Ответ 3
ActorPing (лицензия Apache) из лифта-утилита имеет расписание и расписаниеAtFixedRate Источник: ActorPing.scala
Из scaladoc:
Объект ActorPing назначает актера, который должен быть отправлен с определенным сообщением через определенные промежутки времени. Методы расписания возвращают объект ScheduledFuture, который может быть отменен при необходимости
Ответ 4
К сожалению, есть две ошибки в ответе oxbow_lakes.
Один из них - простая ошибка объявления (длительное время против времени: длинное), но второе - более тонкое.
oxbow_lakes объявляет запуск
def run = actors.Scheduler.execute(f)
Это, однако, приводит к тому, что сообщения время от времени исчезают. То есть: они запланированы, но никогда не отправляются. Объявление запускается как
def run = f
исправил это для меня. Он сделал точный путь в ActorPing лифтинга.
Весь код планировщика становится:
object Scheduler {
private lazy val sched = Executors.newSingleThreadedScheduledExecutor();
def schedule(f: => Unit, time: Long) {
sched.schedule(new Runnable {
def run = f
}, time - Platform.currentTime, TimeUnit.MILLISECONDS);
}
}
Я попытался отредактировать сообщение oxbow_lakes, но не смог его сохранить (сломан?), но у меня нет прав на комментарий. Поэтому новый пост.