Обновление Scala коллекций поточно-безопасно
Мне было интересно, есть ли "простой" способ безопасно обновлять неизменяемые коллекции scala. Рассмотрим следующий код:
class a {
private var x = Map[Int,Int]()
def update(p:(Int,Int)) { x = x + (p) }
}
Этот код не является потокобезопасным, правильно? Под этим я подразумеваю, что если у нас есть два потока, вызывающих метод обновления, и мы можем сказать, что x - это карта, содержащая {1 = > 2}, а поток A вызывает обновление ((3,4)) и ему только удается выполнить x + (p) часть кода. Затем происходит перенастройка и поток B вызывает обновление ((13,37)) и успешно обновляет переменную x. Нить А продолжается и заканчивается.
После того, как все это закончится, значение x будет равно карте, содержащей {1 = > 2, 3 = > 4}, правильно? Вместо желаемого {1 = > 2, 3 = > 4, 13 = > 37}. Есть ли простой способ исправить это? Я надеюсь, что это неудивительно, что я прошу:)
Кстати, я знаю, что есть такие решения, как Akka STM, но я бы предпочел не использовать их, если это необходимо.
Спасибо за любой ответ!
edit: Кроме того, я бы предпочел решение без блокировки. Eeeew:)
Ответы
Ответ 1
В вашем случае, как писал Маурисио, ваша коллекция уже потокобезопасна, потому что она неизменна. Единственная проблема заключается в переназначении var
, который может быть не атомной операцией. Для этой конкретной проблемы самым простым вариантом является использование хороших классов в java.util.concurrent.atomic
, а именно AtomicReference
.
import java.util.concurrent.atomic.AtomicReference
class a {
private val x = new AtomicReference(Map[Int,Int]())
def update(p:(Int,Int)) {
while (true) {
val oldMap = x.get // get old value
val newMap = oldMap + p // update
if (x.compareAndSet(oldMap, newMap))
return // exit if update was successful, else repeat
}
}
}
Ответ 2
- из коробки
- атомарность
- безблокировочного
- О (1)
Проверьте это: http://www.scala-lang.org/api/2.11.4/index.html#scala.collection.concurrent.TrieMap
Ответ 3
Сама коллекция является потокобезопасной, поскольку она не имеет общего измененного состояния, но ваш код отсутствует, и нет возможности исправить это без блокировки, так как у вас есть общее изменяемое состояние. Ваш лучший вариант - заблокировать сам метод, обозначая его как синхронизированный.
Другим решением будет использование изменчивой параллельной карты, возможно java.util.concurrent.ConcurrentMap.
Ответ 4
Re. Ответ Jean-Philippe Pellet: вы можете сделать это немного более удобным для использования:
def compareAndSetSync[T](ref: AtomicReference[T])(logic: (T => T)) {
while(true) {
val snapshot = ref.get
val update = logic(snapshot)
if (ref.compareAndSet(snapshot, update)) return
}
}
def compareSync[T,V](ref: AtomicReference[T])(logic: (T => V)): V = {
var continue = true
var snapshot = ref.get
var result = logic(snapshot)
while (snapshot != ref.get) {
snapshot = ref.get
result = logic(snapshot)
}
result
}