Спящие актеры?

Какой лучший способ спящего актера? У меня есть актеры, созданные как агенты, которые хотят поддерживать разные части базы данных (включая получение данных из внешних источников). По ряду причин (в том числе не перегружая базу данных или сообщений и общую нагрузку), я хочу, чтобы участники спали между каждой операцией. Я смотрю на что-то вроде 10 объектов-актеров.

Актеры будут работать довольно много бесконечно, так как всегда будут появляться новые данные или сидеть в таблице, ожидающей распространения в других частях базы данных и т.д. Идея заключается в том, чтобы база данных была как можно более полной в любой момент времени.

Я мог бы сделать это с помощью бесконечного цикла и спать в конце каждого цикла, но в соответствии с http://www.scala-lang.org/node/242 участники используют пул потоков, который расширяется всякий раз, когда блокируются все потоки. Поэтому я думаю, что Thread.sleep в каждом актере будет плохой идеей, так как ненужные ненужные потоки.

Возможно, у меня мог бы быть центральный актер со своей собственной петлей, которая отправляет сообщения подписчикам на часы (например, наблюдатели часов асинхронных событий)?

Кто-нибудь делал что-либо подобное или имел какие-то предложения? Извините за дополнительную (возможно, лишнюю) информацию.

Приветствия

Джо

Ответы

Ответ 1

Нет необходимости явно приводить актера в спящий режим: использование loop и react для каждого актера означает, что в базовом пуле потоков будут ожидающие потоки, пока нет сообщений для участников.

В том случае, если вы хотите запланировать события для ваших участников, это довольно просто, используя однопоточный планировщик из служебных программ java.util.concurrent:

object Scheduler {
  import java.util.concurrent.Executors
  import scala.compat.Platform
  import java.util.concurrent.TimeUnit
  private lazy val sched = Executors.newSingleThreadScheduledExecutor();
  def schedule(f: => Unit, time: Long) {
    sched.schedule(new Runnable {
      def run = f
    }, time , TimeUnit.MILLISECONDS);
  }
}

Вы можете расширить это, чтобы выполнять периодические задачи, и он может использоваться таким образом:

val execTime = //...  
Scheduler.schedule( { Actor.actor { target ! message }; () }, execTime)

Затем вашему целевому актеру просто нужно реализовать соответствующий цикл react для обработки данного сообщения. Вам не нужно, чтобы у вас был спящий актер.

Ответ 2

В первом ответе был хороший момент для Эрланг, но он, похоже, исчез. Вы можете сделать тот же Erlang-подобный трюк с Scala актерами легко. Например. создайте планировщик, который не использует потоки:

import actors.{Actor,TIMEOUT}

def scheduler(time: Long)(f: => Unit) = {
  def fixedRateLoop {
    Actor.reactWithin(time) {
      case TIMEOUT => f; fixedRateLoop
      case 'stop => 
    }
  }
  Actor.actor(fixedRateLoop)
}

И пусть протестировать его (я сделал это правильно в Scala REPL) с помощью тестового клиента-клиента:

case class Ping(t: Long)

import Actor._
val test = actor { loop {
  receiveWithin(3000) {
    case Ping(t) => println(t/1000)
    case TIMEOUT => println("TIMEOUT")
    case 'stop => exit
  }
} }

Запустите планировщик:

import compat.Platform.currentTime
val sched = scheduler(2000) { test ! Ping(currentTime) }

и вы увидите что-то вроде этого

scala> 1249383399
1249383401
1249383403
1249383405
1249383407

что означает, что наш планировщик отправляет сообщение каждые 2 секунды, как ожидалось. Остановите планировщик:

sched ! 'stop

тестовый клиент начнет сообщать о тайм-аутах:

scala> TIMEOUT
TIMEOUT
TIMEOUT

остановите его:

test ! 'stop

Ответ 3

ActorPing (лицензия Apache) из лифта-утилита имеет расписание и расписаниеAtFixedRate Источник: ActorPing.scala

Из scaladoc:

Объект ActorPing назначает актера, который должен быть отправлен с определенным сообщением через определенные промежутки времени. Методы расписания возвращают объект ScheduledFuture, который может быть отменен при необходимости

Ответ 4

К сожалению, есть две ошибки в ответе oxbow_lakes.

Один из них - простая ошибка объявления (длительное время против времени: длинное), но второе - более тонкое.

oxbow_lakes объявляет запуск

def run = actors.Scheduler.execute(f) 

Это, однако, приводит к тому, что сообщения время от времени исчезают. То есть: они запланированы, но никогда не отправляются. Объявление запускается как

def run = f

исправил это для меня. Он сделал точный путь в ActorPing лифтинга.

Весь код планировщика становится:

object Scheduler {
    private lazy val sched = Executors.newSingleThreadedScheduledExecutor();
    def schedule(f: => Unit, time: Long) {
        sched.schedule(new Runnable {
          def run = f
        }, time - Platform.currentTime, TimeUnit.MILLISECONDS);
    }
}

Я попытался отредактировать сообщение oxbow_lakes, но не смог его сохранить (сломан?), но у меня нет прав на комментарий. Поэтому новый пост.