Целлюлоидная асинхронность внутри блоков ruby ​​не работает

Попытка реализовать Celluloid async в моем рабочем примере, похоже, демонстрирует странное поведение.

здесь мой код выглядит

 class Indefinite
    include Celluloid

      def run!
         loop do 
           [1].each do |i|
             async.on_background
           end
         end
      end 


       def on_background
         puts "Running in background" 
       end
   end

   Indefinite.new.run!

но когда я запускаю вышеуказанный код, я никогда не вижу puts " Запуск в фоновом режиме"

Но если я поставлю sleep, код, похоже, работает.

class Indefinite
   include Celluloid

    def run! 
      loop do 
        [1].each do |i|
          async.on_background
        end
        sleep 0.5
      end 
    end


   def on_background
     puts "Running in background" 
   end
 end

 Indefinite.new.run!

Любая идея? почему такая разница в вышеупомянутом двух сценариях.

Спасибо.

Ответы

Ответ 1

Ваш основной цикл доминирует над потоками актера/приложения.

Вся ваша программа делает нереста фоновые процессы, но никогда не запускает их. Вам нужно, чтобы sleep в цикле было чисто, чтобы фоновые потоки могли привлечь внимание.

Обычно не рекомендуется иметь безусловный цикл, создающий бесконечные фоновые процессы, как вы здесь. Должна быть либо задержка, либо условное утверждение, установленное там... иначе у вас просто есть бесконечный цикл, создающий вещи, которые никогда не будут вызваны.

Подумайте об этом так: если вы положите puts "looping" только внутри своего цикла, в то время как вы не видите Running in the background... вы будете видеть looping снова и снова.


Подход №1: используйте блоки every или after.

Лучший способ исправить это - не использовать sleep внутри loop, но использовать блок after или every, например:

every(0.1) {
    on_background
}

Или лучше всего, если вы хотите убедиться, что процесс выполняется полностью перед запуском снова, используйте after вместо этого:

def run_method
    @running ||= false
    unless @running
        @running = true
        on_background
        @running = false
    end
    after(0.1) { run_method }
 end

Использование loop не является хорошей идеей с async, если не выполняется какой-либо контроль потока или процесс блокировки, например, с @server.accept... иначе он просто вытащит 100% CPU ядро без уважительной причины.

Кстати, вы также можете использовать now_and_every, а также now_and_after тоже... это сразу же запустит блок, а затем запустит его снова после того, как вы захотите.

Использование every показано в этом значении:


Идеальная ситуация, на мой взгляд:

Это грубый, но применимый пример:


require 'celluloid/current'

class Indefinite
  include Celluloid

  INTERVAL = 0.5
  ONE_AT_A_TIME = true

  def self.run!
    puts "000a Instantiating."
    indefinite = new
    indefinite.run
    puts "000b Running forever:"
    sleep
  end

  def initialize
    puts "001a Initializing."
    @mutex = Mutex.new if ONE_AT_A_TIME
    @running = false
    puts "001b Interval: #{INTERVAL}"
  end

  def run
    puts "002a Running."
    unless ONE_AT_A_TIME && @running
      if ONE_AT_A_TIME
        @mutex.synchronize {
          puts "002b Inside lock."
          @running = true
          on_background
          @running = false
        }
      else
        puts "002b Without lock."
        on_background
      end
    end
    puts "002c Setting new timer."
    after(INTERVAL) { run }
  end


  def on_background
    if ONE_AT_A_TIME
      puts "003 Running background processor in foreground."
    else
      puts "003 Running in background"
    end
  end
end

Indefinite.run!
puts "004 End of application."

Это будет его вывод, если ONE_AT_A_TIME - true:

000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
000b Running forever:
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.
002a Running.
002b Inside lock.
003 Running background processor in foreground.
002c Setting new timer.

И это будет его вывод, если ONE_AT_A_TIME - false:

000a Instantiating.
001a Initializing.
001b Interval: 0.5
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
000b Running forever:
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.
002a Running.
002b Without lock.
003 Running in background
002c Setting new timer.

Вам нужно быть более "evented", чем "threaded", чтобы правильно запускать задачи и сохранять область и состояние, а не выдавать команды между потоками/участниками... это то, что предоставляют блоки every и after. И кроме того, это хорошая практика в любом случае, даже если у вас не было Global Interpreter Lock, потому что в вашем примере это не похоже на то, что вы имеете дело с процессом блокировки. Если у вас был блокирующий процесс, то непременно должен быть бесконечный цикл. Но так как вы просто собираетесь создать бесконечное количество фоновых задач, прежде чем один из них будет обработан, вам нужно либо использовать sleep, как и ваш вопрос, либо использовать совсем другую стратегию, и использовать every и after, а именно, как Celluloid сам побуждает вас работать, когда дело касается обработки данных в виде сокетов любого типа.


Подход №2: используйте вызов рекурсивного метода.

Это только что появилось в группе Google. Приведенный ниже пример кода фактически позволяет выполнять другие задачи, даже если это бесконечный цикл.

Этот подход менее желателен, потому что он, вероятно, будет иметь больше накладных расходов, порождая серию волокон.

def work
    # ...
    async.work
end

ВопроС# 2: Thread против Fiber.

Второй вопрос заключается в следующем: loop { Thread.new { puts "Hello" } }

Это порождает бесконечное количество потоков процессов, управление которыми напрямую осуществляется с помощью RVM. Несмотря на то, что в RVM используется Global Interpreter Lock..., который использует только green threads, которые предоставляются самой операционной системой... вместо этого они обрабатываются самим процессом. Планировщик CPU для процесса запускает каждый Thread сам, без колебаний. А в случае примера Thread работает очень быстро и затем умирает.

По сравнению с задачей async используется Fiber. Итак, что происходит в случае по умолчанию:

  • Процесс начинается.
  • Создан актер.
  • Вызов метода вызывает цикл.
  • Loop вызывает метод async.
  • async метод добавляет задачу в почтовый ящик.
  • Почтовый ящик не вызывается, а цикл продолжается.
  • В почтовый ящик добавлена ​​другая задача async.
  • Это продолжается бесконечно.

Это связано с тем, что сам метод цикла представляет собой вызов Fiber, который никогда не приостанавливается (если не вызывается sleep!), и поэтому добавленная к почтовому ящику дополнительная задача никогда не вызывает новый Fiber. A Fiber ведет себя иначе, чем a Thread. Это хороший справочный материал, в котором обсуждаются различия:


ВопроС# 3: Celluloid против Celluloid::ZMQ.

Третий вопрос: почему include Celluloid ведет себя иначе, чем Celluloid::ZMQ...

Это потому, что Celluloid::ZMQ использует аварийный почтовый ящик, основанный на реакторе, по сравнению с Celluloid, который использует почтовый ящик на основе переменной условия.

Подробнее о режимах конвейерной обработки и выполнения:

В этом разница между двумя примерами. Если у вас есть дополнительные вопросы о том, как ведут себя эти почтовые ящики, не стесняйтесь публиковать сообщения в Google Group... главной динамикой, с которой вы сталкиваетесь, является уникальная природа GIL, взаимодействующая с поведением Fiber vs. Thread vs. Reactor.

Здесь вы можете узнать больше о схеме реактора:

И см. конкретный реактор, используемый Celluloid::ZMQ здесь:

Итак, что происходит в сценарии событий, связанных с почтовым ящиком, заключается в том, что при нажатии sleep это блокирующий вызов, который заставляет реактор перейти к следующей задаче в почтовом ящике.

Но также, и это уникально для вашей ситуации, конкретный реактор, используемый Celluloid::ZMQ, использует вечную библиотеку C... в частности, библиотеку 0MQ. Этот реактор является внешним по отношению к вашему приложению, который ведет себя иначе, чем Celluloid::IO или Celluloid сам, и именно поэтому поведение происходит иначе, чем вы ожидали.

Альтернатива поддержки нескольких ядер

Если для вас не важно поддерживать состояние и область видимости, если вы используете jRuby или Rubinius, которые не ограничены одним потоком операционной системы, а с помощью MRI, у которого есть Global Interpreter Lock, вы можете создать больше чем один актер, и вызывать async вызовы между участниками одновременно.

Но мое скромное мнение заключается в том, что в моем примере вам будет гораздо лучше работать с использованием очень высокочастотного таймера, например 0.001 или 0.1, который будет казаться мгновенным для всех целей и задач, но также позволит актеру нить много времени для переключения волокон и выполнения других задач в почтовом ящике.

Ответ 2

Сделайте эксперимент, немного изменив свой пример (мы его модифицируем, потому что таким образом мы получаем одно и то же "странное" поведение, делая вещи clearner):

class Indefinite
  include Celluloid

  def run!
    (1..100).each do |i|
      async.on_background i
    end
    puts "100 requests sent from #{Actor.current.object_id}"
  end 

  def on_background(num)
    (1..100000000).each {}
    puts "message #{num} on #{Actor.current.object_id}" 
  end
end

Indefinite.new.run!
sleep

# =>
# 100 requests sent from 2084
# message 1 on 2084
# message 2 on 2084
# message 3 on 2084
# ...

Вы можете запустить его на любом интерпретаторе Ruby, используя Celluloid или Celluloid::ZMQ, результат всегда будет таким же. Также обратите внимание, что вывод из Actor.current.object_id в обоих методах одинаковый, что дает нам ключ к пониманию того, что мы имеем дело с одним актером в нашем эксперименте.

Таким образом, нет никакой разницы между реализацией Ruby и Celluloid, если только этот эксперимент.

Позвольте сначала задать, почему этот код ведет себя таким образом?

Не трудно понять, почему это происходит. Целлулоид принимает входящие запросы и сохраняет их в очереди задач для соответствующего актера. Обратите внимание, что наш первоначальный вызов run! находится в верхней части очереди.

Затем целлулоид обрабатывает эти задачи по одному за раз. Если происходит блокирующий вызов или вызов sleep, согласно документации, будет вызвана следующая задача, не дожидаясь текущего задача должна быть завершена.

Обратите внимание, что в нашем эксперименте нет блокирующих вызовов. Это означает, что метод run! будет выполняться от начала до конца, и только после этого каждый вызов on_background будет вызываться в идеальном порядке.

И как он должен работать.

Если вы добавите вызов sleep в свой код, он уведомит Celluloid, что он должен начать обработку следующей задачи в очереди. Таким образом, поведение, которое вы имеете во втором примере.

Теперь перейдем к разделу о том, как создать систему, чтобы она не зависела от вызовов sleep, что по крайней мере странно.

На самом деле есть хороший пример на странице Целлулоид-ZMQ. Обратите внимание на этот цикл:

def run
  loop { async.handle_message @socket.read }
end

Первое, что он делает, это @socket.read. Обратите внимание, что это операция блокировки. Итак, Celluloid будет обрабатывать следующее сообщение в очереди (если оно есть). Как только @socket.read ответит, будет создана новая задача. Но эта задача не будет выполнена до того, как снова будет вызван @socket.read, тем самым блокируя выполнение и уведомляя Celluloid для обработки следующим элементом в очереди.

Вероятно, вы видите разницу с вашим примером. Вы ничего не блокируете, поэтому не даете Целлулоиду возможности обрабатывать очереди.

Как мы можем получить поведение, приведенное в примере Celluloid::ZMQ?

Первое (на мой взгляд, лучшее) решение состоит в том, чтобы иметь фактический блокирующий вызов, например @socket.read.

Если в коде нет блокирующих вызовов, и вам все равно нужно обрабатывать элементы в фоновом режиме, тогда вам следует рассмотреть другие механизмы, предоставляемые Celluloid.

Есть несколько вариантов с Celluloid. Можно использовать conditions, futures, notifications или просто вызывая wait/signal на низком уровне, как в этом примере:

class Indefinite
  include Celluloid

  def run!
    loop do
      async.on_background
      result = wait(:background) #=> 33
    end
  end 

  def on_background
    puts "background" 

    # notifies waiters, that they can continue
    signal(:background, 33)
  end
end

Indefinite.new.run!
sleep

# ...
# background
# background
# background
# ...

Используя sleep(0) с Celluloid::ZMQ

Я также заметил work.rb файл, упомянутый в вашем комментарии. Он содержит следующий цикл:

loop { [1].each { |i|  async.handle_message 'hello' } ; sleep(0) }

Похоже, что он делает правильную работу. Фактически, его запуск под jRuby показал, что он пропускает память. Чтобы сделать это еще более очевидным, попробуйте добавить вызов сна в тело handle_message:

def handle_message(message)
  sleep 0.5
  puts "got message: #{message}"
end

Использование высокой памяти, вероятно, связано с тем, что очередь заполняется очень быстро и не может быть обработана в заданное время. Это будет более проблематично, если handle_message будет более трудоемким, тогда он теперь.

Решения с sleep

Я скептически отношусь к решениям с sleep. Они потенциально требуют много памяти и даже создают утечки памяти. И не понятно, что вы должны передать как параметр методу sleep и почему.

Ответ 3

Как работают потоки с Celluloid

Celluloid не создает новый поток для каждой асинхронной задачи. Он имеет пул потоков, в котором он выполняет каждую задачу, синхронную и асинхронную. Ключевым моментом является то, что библиотека видит функцию run! как синхронную задачу и выполняет ее в том же контексте, что и асинхронная задача.

По умолчанию Celluloid запускает все в одном потоке, используя систему очередей для планирования асинхронных задач для более поздних версий. Он создает новые потоки только при необходимости.

Кроме того, Celluloid переопределяет функцию sleep. Это означает, что каждый раз, когда вы вызываете sleep в классе, расширяющем класс Celluloid, библиотека проверяет наличие в пуле несвязанных потоков. В вашем случае при первом вызове sleep 0.5 он создаст новый поток для выполнения асинхронных задач в очереди, пока первый поток будет спать.

Итак, в вашем первом примере работает только один поток Celluloid, выполняя цикл. В вашем втором примере работают две целлулоидные потоки, первая из которых выполняет цикл и спящую на каждой итерации, другая выполняет фоновое задание.

Вы можете, например, изменить свой первый пример для выполнения конечного числа итераций:

def run! 
  (0..100).each do
    [1].each do |i|
      async.on_background
    end
  end
  puts "Done!"
end

При использовании этой функции run! вы увидите, что Done! печатается до всех Running in background, что означает, что Целлулоид завершает выполнение функции run! перед запуском асинхронных задач в том же потоке.