Тупик в ThreadPool
Я не смог найти достойную реализацию ThreadPool для Ruby, поэтому я написал мою (частично основанную на коде отсюда: http://snippets.dzone.com/posts/show/3276, но изменился на wait/signal и другую реализацию для завершения ThreadPool. Однако через некоторое время работы (имея 100 потоков и обработку около 1300 задач) он умирает с тупиком в строке 25 - он ждет нового задания. Любые идеи, почему это может случиться?
require 'thread'
begin
require 'fastthread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
while @mutex.synchronize {@running}
block = get_block
if block
block.call
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
end
end
end
end
def name
@thread.inspect
end
def get_block
@mutex.synchronize {@block}
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there a job to be done
@cv.signal
end
end
def reset_block
@mutex.synchronize {@block = nil}
end
def busy?
@mutex.synchronize {[email protected]?}
end
def stop
@mutex.synchronize {@running = false}
# Signal the thread not to wait for a new job
@cv.signal
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@workers = []
@mutex = Mutex.new
@cv = ConditionVariable.new
end
def size
@mutex.synchronize {@workers.size}
end
def busy?
@mutex.synchronize {@workers.any? {|w| w.busy?}}
end
def shutdown
@mutex.synchronize {@workers.each {|w| w.stop}}
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
while true
@mutex.synchronize do
worker = get_worker
if worker
return worker.set_block(block)
else
# Wait for a free worker
@cv.wait(@mutex)
end
end
end
end
# Used by workers to report ready status
def signal
@cv.signal
end
private
def get_worker
free_worker || create_worker
end
def free_worker
@workers.each {|w| return w unless w.busy?}; nil
end
def create_worker
return nil if @workers.size >= @max_size
worker = Worker.new(self)
@workers << worker
worker
end
end
Ответы
Ответ 1
Итак, основная проблема с реализацией заключается в следующем: как убедиться, что сигнал не потерян и избежать мертвых блокировок?
По моему опыту, это ДЕЙСТВИТЕЛЬНО трудно достичь с переменными состояния и мьютексом, но легко с помощью семафоров. Так получилось, что ruby реализует объект под названием Queue (или SizedQueue), который должен решить проблему. Вот моя предлагаемая реализация:
require 'thread'
begin
require 'fasttread'
rescue LoadError
$stderr.puts "Using the ruby-core thread implementation"
end
class ThreadPool
class Worker
def initialize(thread_queue)
@mutex = Mutex.new
@cv = ConditionVariable.new
@queue = thread_queue
@running = true
@thread = Thread.new do
@mutex.synchronize do
while @running
@cv.wait(@mutex)
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
end
@queue << self
end
end
end
end
def name
@thread.inspect
end
def get_block
@block
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
# Signal the thread in this class, that there a job to be done
@cv.signal
end
end
def reset_block
@block = nil
end
def busy?
@mutex.synchronize { [email protected]? }
end
def stop
@mutex.synchronize do
@running = false
@cv.signal
end
@thread.join
end
end
attr_accessor :max_size
def initialize(max_size = 10)
@max_size = max_size
@queue = Queue.new
@workers = []
end
def size
@workers.size
end
def busy?
@queue.size < @workers.size
end
def shutdown
@workers.each { |w| w.stop }
@workers = []
end
alias :join :shutdown
def process(block=nil,&blk)
block = blk if block_given?
worker = get_worker
worker.set_block(block)
end
private
def get_worker
if [email protected]? or @workers.size == @max_size
return @queue.pop
else
worker = Worker.new(@queue)
@workers << worker
worker
end
end
end
И вот простой тестовый код:
tp = ThreadPool.new 500
(1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
tp.shutdown
Ответ 2
Вы можете попробовать work_queue gem, предназначенный для координации работы между производителем и пулом рабочих потоков.
Ответ 3
Я немного предвзятый, но я бы предложил моделировать это на каком-то языке процесса и проверить его модель. Свободно доступными инструментами являются, например, набор инструментов mCRL2 (с использованием языка на основе ACP), Mobility Workbench (pi-calculus) и Spin (PROMELA).
В противном случае я бы предложил удалить каждый бит кода, который не является существенным для проблемы, и найти минимальный случай, когда происходит тупик. Я сомневаюсь, что это 100 потоков и 1300 задач, необходимых для того, чтобы зайти в тупик. В случае меньшего размера вы можете просто добавить некоторые отладочные отпечатки, которые обеспечивают достаточную информацию, чтобы решить проблему.
Ответ 4
Хорошо, проблема, похоже, в методе сигнала ThreadPool #. Что может произойти:
1 - весь ваш рабочий занят, и вы пытаетесь обработать новое задание
2 - строка 90 получает нулевого сотрудника
3 - рабочий освобождается и сигнализирует об этом, но сигнал теряется, поскольку ThreadPool его не ждет.
4 - вы попадаете в строку 95, ожидая, хотя есть свободный рабочий.
Ошибка здесь в том, что вы можете сигнализировать свободному работнику, даже если никто не слушает. Этот метод сигнала ThreadPool # должен быть:
def signal
@mutex.synchronize { @cv.signal }
end
И проблема такая же в объекте Worker. Что может произойти:
1 - Работник только что завершил работу
2 - проверяет (строка 17), если есть ожидание работы: нет
3 - пул потоков отправляет новое задание и сигнализирует об этом... но сигнал теряется
4 - Рабочий ждет сигнала, даже если он помечен как занятый
Вы должны поместить свой метод инициализации следующим образом:
def initialize(callback)
@mutex = Mutex.new
@cv = ConditionVariable.new
@callback = callback
@mutex.synchronize {@running = true}
@thread = Thread.new do
@mutex.synchronize do
while @running
block = get_block
if block
@mutex.unlock
block.call
@mutex.lock
reset_block
# Signal the ThreadPool that this worker is ready for another job
@callback.signal
else
# Wait for a new job
@cv.wait(@mutex)
end
end
end
end
end
Далее, методы Worker # get_block и Worker # reset_block больше не должны синхронизироваться. Таким образом, вы не можете иметь блок, назначенный работнику между тестом для блока и ожиданием сигнала.