Чисто-рубиновый параллельный хэш
Какой лучший способ реализовать хэш, который может быть изменен для нескольких потоков, но с наименьшим количеством блокировок. Для целей этого вопроса вы можете предположить, что хэш будет тяжелым для чтения. Он должен быть потокобезопасным во всех реализациях Ruby, включая те, которые работают по-настоящему одновременно, например JRuby, и должны быть написаны в pure-Ruby (не разрешены ни C, ни Java).
Не стесняйтесь представить наивное решение, которое всегда блокируется, но это вряд ли будет лучшим решением. Очки для элегантности, но меньшая вероятность блокировки выигрывает по более мелкому коду.
Ответы
Ответ 1
Хорошо, теперь, когда вы указали фактическое значение "threadsafe", вот две потенциальные реализации. Следующий код будет работать вечно в MRI и JRuby. Беззаконная реализация следует за моделью возможной согласованности, где каждый поток использует свое собственное представление о хеше, если мастер находится в потоке. Существует небольшая обманка, необходимая для того, чтобы сохранить всю информацию в потоке, не утечка памяти, но она обрабатывается и протестирована - размер процесса не увеличивает этот код. Обеим реализациям потребуется больше "полной" работы, то есть удаление, обновление и т.д. Потребует некоторого размышления, но одно из двух нижеприведенных концепций будет соответствовать вашим требованиям.
Очень важно, чтобы люди, читающие эту тему, понимали, что вся проблема принадлежит исключительно JRuby - в MRI достаточно встроенного хэша.
module Cash
def Cash.new(*args, &block)
env = ENV['CASH_IMPL']
impl = env ? Cash.const_get(env) : LocklessImpl
klass = defined?(JRUBY_VERSION) ? impl : ::Hash
klass.new(*args)
end
class LocklessImpl
def initialize
@hash = {}
end
def thread_hash
thread = Thread.current
thread[:cash] ||= {}
hash = thread[:cash][thread_key]
if hash
hash
else
hash = thread[:cash][thread_key] = {}
ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) }
hash
end
end
def thread_key
[Thread.current.object_id, object_id]
end
def []=(key, val)
time = Time.now.to_f
tuple = [time, val]
@hash[key] = tuple
thread_hash[key] = tuple
val
end
def [](key)
# check the master value
#
val = @hash[key]
# someone else is either writing the key or it has never been set. we
# need to invalidate our own copy in either case
#
if val.nil?
thread_val = thread_hash.delete(key)
return(thread_val ? thread_val.last : nil)
end
# check our own thread local value
#
thread_val = thread_hash[key]
# in this case someone else has written a value that we have never seen so
# simply return it
#
if thread_val.nil?
return(val.last)
end
# in this case there is a master *and* a thread local value, if the master
# is newer juke our own cached copy
#
if val.first > thread_val.first
thread_hash.delete(key)
return val.last
else
return thread_val.last
end
end
end
class LockingImpl < ::Hash
require 'sync'
def initialize(*args, &block)
super
ensure
extend Sync_m
end
def sync(*args, &block)
sync_synchronize(*args, &block)
end
def [](key)
sync(:SH){ super }
end
def []=(key, val)
sync(:EX){ super }
end
end
end
if $0 == __FILE__
iteration = 0
loop do
n = 42
hash = Cash.new
threads =
Array.new(10) {
Thread.new do
Thread.current.abort_on_exception = true
n.times do |key|
hash[key] = key
raise "#{ key }=nil" if hash[key].nil?
end
end
}
threads.map{|thread| thread.join}
puts "THREADSAFE: #{ iteration += 1 }"
end
end
Ответ 2
Проводка базы/наивное решение, просто чтобы увеличить мой счетчик стека:
require 'thread'
class ConcurrentHash < Hash
def initialize
super
@mutex = Mutex.new
end
def [](*args)
@mutex.synchronize { super }
end
def []=(*args)
@mutex.synchronize { super }
end
end
Ответ 3
Иегуда, я думаю, вы упомянули, что установка ивара была атомарной? Как насчет простой копии и замены?
require 'thread'
class ConcurrentHash
def initialize
@reader, @writer = {}, {}
@lock = Mutex.new
end
def [](key)
@reader[key]
end
def []=(key, value)
@lock.synchronize {
@writer[key] = value
@reader, @writer = @writer, @reader
@writer[key] = value
}
end
end
Ответ 4
Это класс оболочки вокруг Hash, который позволяет одновременно читателям, но блокирует все остальные типы доступа (включая итерационные чтения).
class LockedHash
def initialize
@hash = Hash.new
@lock = ThreadAwareLock.new()
@reader_count = 0
end
def [](key)
@lock.lock_read
ret = @hash[key]
@lock.unlock_read
ret
end
def []=(key, value)
@lock.lock_write
@hash[key] = value
@lock.unlock_write
end
def method_missing(method_sym, *arguments, &block)
if @hash.respond_to? method_sym
@lock.lock_block
val = lambda{@hash.send(method_sym,*arguments, &block)}.call
@lock.unlock_block
return val
end
super
end
end
Вот код блокировки, который он использует:
class RWLock
def initialize
@outer = Mutex.new
@inner = Mutex.new
@reader_count = 0
end
def lock_read
@outer.synchronize{@inner.synchronize{@reader_count += 1}}
end
def unlock_read
@inner.synchronize{@reader_count -= 1}
end
def lock_write
@outer.lock
while @reader_count > 0 ;end
end
def unlock_write
@outer.unlock
end
end
class ThreadAwareLock < RWLock
def initialize
@owner = nil
super
end
def lock_block
lock_write
@owner = Thread.current.object_id
end
def unlock_block
@owner = nil
unlock_write
end
def lock_read
super unless my_block?
end
def unlock_read
super unless my_block?
end
def lock_write
super unless my_block?
end
def unlock_write
super unless my_block?
end
def my_block?
@owner == Thread.current.object_id
end
end
Блокировка, зависящая от потока, позволяет вам блокировать класс один раз, а затем вызывать методы, которые обычно блокируются, и не блокировать их. Вам это нужно, потому что вы уступаете блокам внутри некоторых методов, и эти блоки могут вызывать методы блокировки для объекта, и вам не нужна ошибка взаимоблокировки или двойная блокировка. Вы можете использовать для этого счетную блокировку.
Здесь предпринята попытка реализовать блокировки чтения и записи на уровне ведра:
class SafeBucket
def initialize
@lock = RWLock.new()
@value_pairs = []
end
def get(key)
@lock.lock_read
pair = @value_pairs.select{|p| p[0] == key}
unless pair && pair.size > 0
@lock.unlock_read
return nil
end
ret = pair[0][1]
@lock.unlock_read
ret
end
def set(key, value)
@lock.lock_write
pair = @value_pairs.select{|p| p[0] == key}
if pair && pair.size > 0
pair[0][1] = value
@lock.unlock_write
return
end
@value_pairs.push [key, value]
@lock.unlock_write
value
end
def each
@value_pairs.each{|p| yield p[0],p[1]}
end
end
class MikeConcurrentHash
def initialize
@buckets = []
100.times {@buckets.push SafeBucket.new}
end
def [](key)
bucket(key).get(key)
end
def []=(key, value)
bucket(key).set(key, value)
end
def each
@buckets.each{|b| b.each{|key, value| yield key, value}}
end
def bucket(key)
@buckets[key.hash % 100]
end
end
Я перестал работать над этим, потому что он слишком медленный, поэтому каждый метод небезопасен (допускает мутации другими потоками во время итерации), и он не поддерживает большинство хеш-методов.
И вот тестовая жгута для одновременных хэшей:
require 'thread'
class HashHarness
Keys = [:a, :basic, :test, :harness, :for, :concurrent, :testing, :of, :hashes,
:that, :tries, :to, :provide, :a, :framework, :for, :designing, :a, :good, :ConcurrentHash,
:for, :all, :ruby, :implementations]
def self.go
h = new
r = h.writiness_range(20, 10000, 0, 0)
r.each{|k, v| p k + ' ' + v.map{|p| p[1]}.join(' ')}
return
end
def initialize(classes = [MikeConcurrentHash, JoshConcurrentHash, JoshConcurrentHash2, PaulConcurrentHash, LockedHash, Hash])
@classes = classes
end
def writiness_range(basic_threads, ops, each_threads, loops)
result = {}
@classes.each do |hash_class|
res = []
0.upto 10 do |i|
writiness = i.to_f / 10
res.push [writiness,test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)]
end
result[hash_class.name] = res
end
result
end
def test_one(hash_class, basic_threads, ops, each_threads, loops, writiness)
time = Time.now
threads = []
hash = hash_class.new
populate_hash(hash)
begin
basic_threads.times do
threads.push Thread.new{run_basic_test(hash, writiness, ops)}
end
each_threads.times do
threads.push Thread.new{run_each_test(hash, writiness, loops)}
end
threads.each{|t| t.join}
rescue ThreadError => e
p [e.message, hash_class.name, basic_threads, ops, each_threads, loops, writiness].join(' ')
return -1
end
p [hash_class.name, basic_threads, ops, each_threads, loops, writiness, Time.now - time].join(' ')
return Time.now - time
end
def run_basic_test(hash, writiness, ops)
ops.times do
rand < writiness ? hash[choose_key]= rand : hash[choose_key]
end
end
def run_each_test(hash, writiness, loops)
loops.times do
hash.each do |k, v|
if rand < writiness
each_write_work(hash, k, v)
else
each_read_work(k, v)
end
end
end
end
def each_write_work(hash, key, value)
hash[key] = rand
end
def each_read_work(key, value)
key.to_s + ": " + value.to_s
end
def choose_key
Keys[rand(Keys.size)]
end
def populate_hash(hash)
Keys.each{|key| hash[key]=rand}
end
end
Числа:
JRuby
Writiness 0.0 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1.0
ConcurrentHash 2.098 3.179 2.971 3.083 2.731 2.941 2.564 2.480 2.369 1.862 1.881
LockedHash 1.873 1.896 2.085 2.058 2.001 2.055 1.904 1.921 1.873 1.841 1.630
Hash 0.530 0.672 0.685 0.822 0.719 0.877 0.901 0.931 0.942 0.950 1.001
И МРТ
Writiness 0.0 0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1.0
ConcurrentHash 9.214 9.913 9.064 10.112 10.240 10.574 10.566 11.027 11.323 11.837 13.036
LockedHash 19.593 17.712 16.998 17.045 16.687 16.609 16.647 15.307 14.464 13.931 14.146
Hash 0.535 0.537 0.534 0.599 0.594 0.676 0.635 0.650 0.654 0.661 0.692
Номера МРТ довольно поразительны. Блокировка в МРТ действительно отстой.
Ответ 5
Это может быть прецедентом для hamster gem
Hamster реализует Hash Array Mapped Tries (HAMT), а также некоторые другие стойкие структуры данных, в чистом Ruby.
Постоянные структуры данных неизменяемы и вместо изменения (изменения) структуры, например, путем добавления или замены пары ключ-значение в Hash, вместо этого вы возвращаете новую структуру данных, которая содержит изменение. Трюк с постоянными неизменными структурами данных состоит в том, что вновь возвращенная структура данных повторно использует как можно больше предшественника.
Я думаю, что для реализации с использованием хомяка вы использовали бы свою изменчивую хэш-оболочку, которая передает все чтения текущему значению постоянного неизменяемого хэша (т.е. должен быть быстрым), сохраняя при этом все записи с помощью мьютекса и заменяя на новое значение постоянного неизменяемого хэша после записи.
Например:
require 'hamster'
require 'hamster/experimental/mutable_hash'
hsh = Hamster.mutable_hash(:name => "Simon", :gender => :male)
# reading goes directly to hash
puts hsh[:name] # Simon
# writing is actually swapping to new value of underlying persistent data structure
hsh.put(:name, "Joe")
puts hsh[:name] # Joe
Итак, позвольте использовать это для аналогичного типа проблемы для описанного:
(gist здесь)
require 'hamster'
require 'hamster/experimental/mutable_hash'
# a bunch of threads with a read/write ratio of 10:1
num_threads = 100
num_reads_per_write = 10
num_loops = 100
hsh = Hamster.mutable_hash
puts RUBY_DESCRIPTION
puts "#{num_threads} threads x #{num_loops} loops, #{num_reads_per_write}:1 R/W ratio"
t0 = Time.now
Thread.abort_on_exception = true
threads = (0...num_threads).map do |n|
Thread.new do
write_key = n % num_reads_per_write
read_keys = (0...num_reads_per_write).to_a.shuffle # random order
last_read = nil
num_loops.times do
read_keys.each do |k|
# Reads
last_read = hsh[k]
Thread.pass
# Atomic increments in the correct ratio to reads
hsh.put(k) { |v| (v || 0) + 1 } if k == write_key
end
end
end
end
threads.map { |t| t.join }
t1 = Time.now
puts "Error in keys" unless (0...num_reads_per_write).to_a == hsh.keys.sort.to_a
puts "Error in values" unless hsh.values.all? { |v| v == (num_loops * num_threads) / num_reads_per_write }
puts "Time elapsed: #{t1 - t0} s"
Я получаю следующие результаты:
ruby 1.9.2p320 (2012-04-20 revision 35421) [x86_64-linux]
100 threads x 100 loops, 10:1 R/W ratio
Time elapsed: 5.763414627 s
jruby 1.7.0 (1.9.3p203) 2012-10-22 ff1ebbe on Java HotSpot(TM) 64-Bit Server VM 1.6.0_26-b03 [linux-amd64]
100 threads x 100 loops, 10:1 R/W ratio
Time elapsed: 1.697 s
Что вы думаете об этом?
Это решение больше похоже на то, как можно решить эту проблему в Scala или Clojure, хотя на этих языках скорее всего будет использоваться программная транзакционная память с низкоуровневой поддержкой ЦП для операций атомного сравнения и свопинга, которые.
Изменить. Стоит отметить, что одна из причин, по которой быстро выполняется хомяк, заключается в том, что он имеет путь чтения без блокировки. Пожалуйста, ответьте в комментариях, если у вас есть вопросы об этом или о том, как это работает.
Ответ 6
this (video, pdf) относится к хеш-таблице без блокировки, реализованной в Java.
spoiler: использует атомарные операции Compare-And-Swap (CAS), если они недоступны в Ruby, вы можете имитировать их с помощью блокировок. не уверен, что это даст какое-либо преимущество перед простыми защищенными блокировкой хеш-таблицами
Ответ 7
Не тестировался и наивный удар при оптимизации для чтения. Предполагается, что большую часть времени значение не будет заблокировано. Если это так, то жесткая петля будет пытаться, пока она не появится. Я помещаю Thread.critical
туда, чтобы гарантировать, что прочитанные потоки не будут выполняться до тех пор, пока запись не будет выполнена. Не уверен, нужна ли критическая часть, это действительно зависит от того, насколько вы тяжело понимаете, так что некоторые бенчмаркинга в порядке.
class ConcurrentHash < Hash
def initialize(*args)
@semaphore = Mutex.new
super
end
def []=(k,v)
begin
old_crit = Thread.critical
Thread.critical = true unless old_crit
@semaphore.synchronize { super }
ensure
Thread.critical = old_crit
end
end
def [](k)
while(true)
return super unless @semaphore.locked?
end
end
end
Может быть несколько других методов чтения, которые должны были бы проверить блокировку @semaphore, я не знаю, реализовано ли все другое в терминах # [].
Ответ 8
Я не совсем понимаю, что это значит. я думаю, что простейшая реализация - это просто
Hash
то есть встроенный рубиновый хеш является потокобезопасным, если потокобезопасность, которую вы подразумеваете, не взорвется, если > 1 поток пытается получить к нему доступ. этот код будет безопасно работать навсегда
n = 4242
hash = {}
loop do
a =
Thread.new do
n.times do
hash[:key] = :val
end
end
b =
Thread.new do
n.times do
hash.delete(:key)
end
end
c =
Thread.new do
n.times do
val = hash[:key]
raise val.inspect unless [nil, :val].include?(val)
end
end
a.join
b.join
c.join
p :THREADSAFE
end
Я подозреваю, что потоковая безопасность действительно означает ACID - например, write like hash [: key] =: val, за которым следует чтение, если has [: key] вернет: val. но никакое количество хитростей с запиранием не может обеспечить того, что последний из них всегда будет побеждать. например, скажем, у вас есть 42 потока, все обновляющие потокобезопасный хеш - какое значение должно читать 43'rd?? безусловно, вы не имеете в виду какой-то общий порядок написания - поэтому, если 42 потока активно пишут "правильное" значение, то любое право? но рубиновый встроенный хаш работает именно так...
возможно, вы имеете в виду что-то вроде
hash.each do ...
в одном потоке и
hash.delete(key)
не будут мешать друг другу? я могу себе представить, что это будет потокобезопасным, но это даже не безопасно в одном потоке с рубином MRI (очевидно, вы не можете изменить хэш во время итерации по нему)
так что вы можете быть более конкретным о том, что вы подразумеваете под "threadsafe"?
единственным способом дать ACID-семантику будет грубая блокировка (конечно, это может быть метод, который взял блок, но все равно внешний замок).
ruby thread scheduler не просто планирует запланировать поток smack в середине некоторой произвольной функции c (как встроенные хэш-методы aset aset), так что они эффективно потокобезопасны.
Ответ 9
К сожалению, я не могу добавить комментарий к Michael Sofaer, где он вводит: класс RWLock и класс LockedHash с @reader_count и т.д. (еще не хватает кармы)
Это решение не работает. Это дает ошибку:
в `unlock ': попытка разблокировать мьютекс, который не заблокирован (ThreadError)
Из-за логической ошибки: когда нужно разблокировать вещи, тогда разблокировка происходит 1 дополнительное время (из-за отсутствия проверки my_block?(). Вместо этого он блокирует ее, даже если разблокировка не нужна "это мой блок" ), и поэтому 2-й разблокировка на уже разблокированных немых вызывает исключение. (Я вложу полный код о том, как воспроизвести эту ошибку в конце этого сообщения).
Также Майкл упомянул "каждый метод небезопасен (допускает мутации другими потоками во время итерации)", что было критично для меня, поэтому я заканчиваю этим упрощенным решением, которое работает для всех моих случаев использования, и оно просто блокирует мьютекс на любой вызов какого-либо метода хеширования при вызове из другого потока (вызовы из того же потока, которому принадлежит блокировка, не блокируются, чтобы избежать взаимоблокировок):
#
# This TrulyThreadSafeHash works!
#
# Note if one thread iterating the hash by #each method
# then the hash will be locked for all other threads (they will not be
# able to even read from it)
#
class TrulyThreadSafeHash
def initialize
@mutex = Mutex.new
@hash = Hash.new
end
def method_missing(method_sym, *arguments, &block)
if [email protected]? # Returns true if this lock is currently held by current thread
# We're trying to lock only if mutex is not owned by the current thread (is not locked or is locked by some other thread).
# Following call will be blocking if mutex locked by other thread:
@mutex.synchronize{
return lambda{@hash.send(method_sym,*arguments, &block)}.call
}
end
# We already own the lock (from current thread perspective).
# We don't even check if @hash.respond_to?(method_sym), let make Hash
# respond properly on all calls (including bad calls (example: wrong method names))
lambda{@hash.send(method_sym,*arguments, &block)}.call
end
# since we're tyring to mimic Hash we'll pretend to respond as Hash would
def self.respond_to?(method_sym, include_private = false)
Hash.respond_to(method_sym, include_private)
end
# override Object to_s because our method_missing won't be called for to_s
def to_s(*arguments)
@mutex.synchronize{
return @hash.to_s
}
end
# And for those, who want to run extra mile:
# to make our class json-friendly we shoud require 'json' and uncomment this:
#def to_json(*options)
# @mutex.synchronize{
# return @hash.to_json(*options)
# }
#end
end
И теперь полный пример, чтобы продемонстрировать/воспроизвести ошибку двойной разблокировки в решении Майкл Дикейнер:
#!/usr/bin/env ruby
# ======= unchanged copy-paste part from Michael Sofaer answer (begin) =======
class LockedHash
def initialize
@hash = Hash.new
@lock = ThreadAwareLock.new()
@reader_count = 0
end
def [](key)
@lock.lock_read
ret = @hash[key]
@lock.unlock_read
ret
end
def []=(key, value)
@lock.lock_write
@hash[key] = value
@lock.unlock_write
end
def method_missing(method_sym, *arguments, &block)
if @hash.respond_to? method_sym
@lock.lock_block
val = lambda{@hash.send(method_sym,*arguments, &block)}.call
@lock.unlock_block
return val
end
super
end
end
class RWLock
def initialize
@outer = Mutex.new
@inner = Mutex.new
@reader_count = 0
end
def lock_read
@outer.synchronize{@inner.synchronize{@reader_count += 1}}
end
def unlock_read
@inner.synchronize{@reader_count -= 1}
end
def lock_write
@outer.lock
while @reader_count > 0 ;end
end
def unlock_write
@outer.unlock
end
end
class ThreadAwareLock < RWLock
def initialize
@owner = nil
super
end
def lock_block
lock_write
@owner = Thread.current.object_id
end
def unlock_block
@owner = nil
unlock_write
end
def lock_read
super unless my_block?
end
def unlock_read
super unless my_block?
end
def lock_write
super unless my_block?
end
def unlock_write
super unless my_block?
end
def my_block?
@owner == Thread.current.object_id
end
end
# ======= unchanged copy-paste part from Michael Sofaer answer (end) =======
# global hash object, which will be 'shared' across threads
$h = LockedHash.new
# hash_reader is just iterating through the 'shared' hash $h
# and prints specified delimeter (capitalized when last hash item read)
def hash_reader(delim)
loop{
count = 0
$h.each{
count += 1
if count != $h.size
$stderr.print delim
else
$stderr.puts delim.upcase
end
}
}
end
# fill hash with 10 items
10.times{|i|
$h[i] = i
}
# create a thread which will read $h hash
t1 = Thread.new(){
hash_reader("o")
}
t1.join # will never happen, but for completeness
что дает следующую ошибку:
./LockedHash_fails_to_unlock.rb
oooooooooO
./LockedHash_fails_to_unlock.rb:55:in `unlock': Attempt to unlock a mutex which is not locked (ThreadError)
from ./LockedHash_fails_to_unlock.rb:55:in `unlock_write'
from ./LockedHash_fails_to_unlock.rb:82:in `unlock_write'
from ./LockedHash_fails_to_unlock.rb:70:in `unlock_block'
from ./LockedHash_fails_to_unlock.rb:29:in `method_missing'
from ./LockedHash_fails_to_unlock.rb:100:in `block in hash_reader'
from ./LockedHash_fails_to_unlock.rb:98:in `loop'
from ./LockedHash_fails_to_unlock.rb:98:in `hash_reader'
from ./LockedHash_fails_to_unlock.rb:119:in `block in <main>'
Ответ 10
Поскольку вы упоминаете, что Хэш будет считаться тяжелым, наличие одной блокировки мьютекса как чтения, так и записи приведет к условиям гонки, которые, скорее всего, выиграны при чтении. Если это нормально с вами, то игнорируйте ответ.
Если вы хотите присвоить приоритет записи, это поможет блокировка чтения и записи. Следующий код основан на некотором старом назначении С++ для класса Operating Systems, поэтому может не быть лучшим качеством, но дает общую идею.
require 'thread'
class ReadWriteLock
def initialize
@critical_section = Mutex.new
@are_writers_finished = ConditionVariable.new
@are_readers_finished = ConditionVariable.new
@readers = 0
@writers = 0
@writer_locked = false
end
def read
begin
start_read
yield
ensure
end_read
end
end
def start_read
@critical_section.lock
while (@writers != 0 || @writer_locked)
@are_writers_finished.wait(@critical_section)
end
@readers += 1
@critical_section.unlock
end
def end_read
@critical_section.lock
if (@readers -= 1) == 0
@are_readers_finished.broadcast
end
@critical_section.unlock
end
def write
begin
start_write
yield
ensure
end_write
end
end
def start_write
@critical_section.lock
@writers += 1
while @readers > 0
@are_readers_finished.wait(@critical_section)
end
while @writer_locked
@are_writers_finished.wait(@critical_section)
end
@writers -= 1
@writer_locked = true
@critical_section.unlock
end
def end_write
@critical_section.lock
@writer_locked = false
@are_writers_finished.broadcast
@critical_section.unlock
end
end
Затем просто оберните [] = и [] в lock.write и lock.read. Может иметь влияние на производительность, но гарантирует, что записи будут "проходить" чтения. Полезность этого зависит от того, насколько на самом деле он тяжелый.