Ответ 1
Ваш основной цикл доминирует над потоками актера/приложения.
Вся ваша программа делает нереста фоновые процессы, но никогда не запускает их. Вам нужно, чтобы sleep
в цикле было чисто, чтобы фоновые потоки могли привлечь внимание.
Обычно не рекомендуется иметь безусловный цикл, создающий бесконечные фоновые процессы, как вы здесь. Должна быть либо задержка, либо условное утверждение, установленное там... иначе у вас просто есть бесконечный цикл, создающий вещи, которые никогда не будут вызваны.
Подумайте об этом так: если вы положите puts "looping"
только внутри своего цикла, в то время как вы не видите Running in the background
... вы будете видеть looping
снова и снова.
Подход №1: используйте блоки every
или after
.
Лучший способ исправить это - не использовать sleep
внутри loop
, но использовать блок after
или every
, например:
every(0.1) {
on_background
}
Или лучше всего, если вы хотите убедиться, что процесс выполняется полностью перед запуском снова, используйте after
вместо этого:
def run_method
@running ||= false
unless @running
@running = true
on_background
@running = false
end
after(0.1) { run_method }
end
Использование loop
не является хорошей идеей с async
, если не выполняется какой-либо контроль потока или процесс блокировки, например, с @server.accept
... иначе он просто вытащит 100% CPU ядро без уважительной причины.
Кстати, вы также можете использовать now_and_every
, а также now_and_after
тоже... это сразу же запустит блок, а затем запустит его снова после того, как вы захотите.
Использование every
показано в этом значении:
Идеальная ситуация, на мой взгляд:
Это грубый, но применимый пример:
require 'celluloid/current'
class Indefinite
include Celluloid
INTERVAL = 0.5
ONE_AT_A_TIME = true
def self.run!
puts "000a Instantiating."
indefinite = new
indefinite.run
puts "000b Running forever:"
sleep
end
def initialize
puts "001a Initializing."
@mutex = Mutex.new if ONE_AT_A_TIME
@running = false
puts "001b Interval: #{INTERVAL}"
end
def run
puts "002a Running."
unless ONE_AT_A_TIME && @running
if ONE_AT_A_TIME
@mutex.synchronize {
puts "002b Inside lock."
@running = true
on_background
@running = false
}
else
puts "002b Without lock."
on_background
end
end
puts "002c Setting new timer."
after(INTERVAL) { run }
end
def on_background
if ONE_AT_A_TIME
puts "003 Running background processor in foreground."
else
puts "003 Running in background"
end
end
end
Indefinite.run!
puts "004 End of application."
Это будет его вывод, если ONE_AT_A_TIME
- true
:
000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
000b Running forever:
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
И это будет его вывод, если ONE_AT_A_TIME
- false
:
000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
000b Running forever:
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
Вам нужно быть более "evented", чем "threaded", чтобы правильно запускать задачи и сохранять область и состояние, а не выдавать команды между потоками/участниками... это то, что предоставляют блоки every
и after
. И кроме того, это хорошая практика в любом случае, даже если у вас не было Global Interpreter Lock
, потому что в вашем примере это не похоже на то, что вы имеете дело с процессом блокировки. Если у вас был блокирующий процесс, то непременно должен быть бесконечный цикл. Но так как вы просто собираетесь создать бесконечное количество фоновых задач, прежде чем один из них будет обработан, вам нужно либо использовать sleep
, как и ваш вопрос, либо использовать совсем другую стратегию, и использовать every
и after
, а именно, как Celluloid
сам побуждает вас работать, когда дело касается обработки данных в виде сокетов любого типа.
Подход №2: используйте вызов рекурсивного метода.
Это только что появилось в группе Google. Приведенный ниже пример кода фактически позволяет выполнять другие задачи, даже если это бесконечный цикл.
Этот подход менее желателен, потому что он, вероятно, будет иметь больше накладных расходов, порождая серию волокон.
def work
# ...
async.work
end
ВопроС# 2: Thread
против Fiber
.
Второй вопрос заключается в следующем: loop { Thread.new { puts "Hello" } }
Это порождает бесконечное количество потоков процессов, управление которыми напрямую осуществляется с помощью RVM
. Несмотря на то, что в RVM
используется Global Interpreter Lock
..., который использует только green threads
, которые предоставляются самой операционной системой... вместо этого они обрабатываются самим процессом. Планировщик CPU для процесса запускает каждый Thread
сам, без колебаний. А в случае примера Thread
работает очень быстро и затем умирает.
По сравнению с задачей async
используется Fiber
. Итак, что происходит в случае по умолчанию:
- Процесс начинается.
- Создан актер.
- Вызов метода вызывает цикл.
- Loop вызывает метод
async
. -
async
метод добавляет задачу в почтовый ящик. - Почтовый ящик не вызывается, а цикл продолжается.
- В почтовый ящик добавлена другая задача
async
. - Это продолжается бесконечно.
Это связано с тем, что сам метод цикла представляет собой вызов Fiber
, который никогда не приостанавливается (если не вызывается sleep
!), и поэтому добавленная к почтовому ящику дополнительная задача никогда не вызывает новый Fiber
. A Fiber
ведет себя иначе, чем a Thread
. Это хороший справочный материал, в котором обсуждаются различия:
ВопроС# 3: Celluloid
против Celluloid::ZMQ
.
Третий вопрос: почему include Celluloid
ведет себя иначе, чем Celluloid::ZMQ
...
Это потому, что Celluloid::ZMQ
использует аварийный почтовый ящик, основанный на реакторе, по сравнению с Celluloid
, который использует почтовый ящик на основе переменной условия.
Подробнее о режимах конвейерной обработки и выполнения:
В этом разница между двумя примерами. Если у вас есть дополнительные вопросы о том, как ведут себя эти почтовые ящики, не стесняйтесь публиковать сообщения в Google Group... главной динамикой, с которой вы сталкиваетесь, является уникальная природа GIL
, взаимодействующая с поведением Fiber
vs. Thread
vs. Reactor
.
Здесь вы можете узнать больше о схеме реактора:
- http://en.wikipedia.org/wiki/Reactor_pattern
- Объяснение шаблона "Реактор"
- В чем разница между моделью, управляемой событиями, и моделью реактора?
И см. конкретный реактор, используемый Celluloid::ZMQ
здесь:
Итак, что происходит в сценарии событий, связанных с почтовым ящиком, заключается в том, что при нажатии sleep
это блокирующий вызов, который заставляет реактор перейти к следующей задаче в почтовом ящике.
Но также, и это уникально для вашей ситуации, конкретный реактор, используемый Celluloid::ZMQ
, использует вечную библиотеку C... в частности, библиотеку 0MQ
. Этот реактор является внешним по отношению к вашему приложению, который ведет себя иначе, чем Celluloid::IO
или Celluloid
сам, и именно поэтому поведение происходит иначе, чем вы ожидали.
Альтернатива поддержки нескольких ядер
Если для вас не важно поддерживать состояние и область видимости, если вы используете jRuby
или Rubinius
, которые не ограничены одним потоком операционной системы, а с помощью MRI
, у которого есть Global Interpreter Lock
, вы можете создать больше чем один актер, и вызывать async
вызовы между участниками одновременно.
Но мое скромное мнение заключается в том, что в моем примере вам будет гораздо лучше работать с использованием очень высокочастотного таймера, например 0.001
или 0.1
, который будет казаться мгновенным для всех целей и задач, но также позволит актеру нить много времени для переключения волокон и выполнения других задач в почтовом ящике.