Я понимаю, что мы сначала блокируем приемник s (который является типом Stat), а затем добавляем к нему, если счетчик существует.
Ответ 1
Вопросы:
Q1: зачем нам блокировать его? Что означает RWMutex
?
RW означает чтение/запись. CF doc: http://golang.org/pkg/sync/#RWMutex.
Нам нужно заблокировать его, чтобы другие подпрограммы/нитки не изменяли значение при его обработке.
Q2: s.countersLock.RLock()
- блокирует ли он весь приемник или только поле счетчиков в типе Stat?
Как мьютекс, блокировка происходит только тогда, когда вы вызываете функцию RLock()
. Если какой-либо другой горутин уже назвал WLock()
, он блокирует. Вы можете вызвать любое число RLock()
в пределах одного и того же goroutine, оно не будет заблокировано.
Поэтому он не блокирует другие поля, даже s.counters
. В вашем примере вы заблокируете поиск карты, чтобы найти правильный счетчик.
Q3: s.countersLock.RLock()
- блокирует ли это поле средних?
Нет, как сказано в Q2, a RLock
блокирует только себя.
Q4: Почему мы должны использовать RWMutex
? Я думал, что канал является предпочтительным способом обрабатывать concurrency в Голанге?
Канал очень полезен, но иногда этого недостаточно, и иногда это не имеет смысла.
Здесь, когда вы блокируете доступ к карте, мьютекс имеет смысл. С chan, вы должны иметь буферизованный chan из 1, отправлять до и получать после. Не очень интуитивно.
Q5: Что это за atomic.AddInt64
. Почему в этом случае нам нужен атом?
Эта функция будет увеличивать заданную переменную атомарным способом. В вашем случае у вас есть условие гонки: counter
- это указатель, и фактическая переменная может быть уничтожена после освобождения блокировки и перед вызовом atomic.AddInt64
.
Если вы не знакомы с такими вещами, я бы посоветовал вам придерживаться Mutexes и выполнять всю необходимую обработку между блокировкой/разблокировкой.
Q6: Почему мы должны разблокировать право до того, как мы добавим к нему? Вы не должны.
Я не знаю, что вы пытаетесь сделать, но вот пример (простой): https://play.golang.org/p/cVFPB-05dw
Ответ 2
Когда более одного потока * необходимо изменить одно и то же значение, необходим механизм блокировки для синхронизации доступа. Без него два или более потока * могут одновременно записываться в одно и то же значение, что приводит к поврежденной памяти, которая обычно приводит к сбою.
Пакет atomic обеспечивает быстрый и простой способ синхронизации доступа к примитивным значениям. Для счетчика это самый быстрый метод синхронизации. Он имеет методы с четко определенными вариантами использования, такими как приращение, уменьшение, свопинг и т.д.
Пакет sync обеспечивает способ синхронизации доступа к более сложным значениям, таким как карты, срезы, массивы или группы значений, Вы используете это для случаев использования, которые не определены в atomic.
В любом случае блокировка требуется только при записи. Несколько потоков * могут безопасно считывать одно и то же значение без механизма блокировки.
Давайте посмотрим на предоставленный вами код.
type Stat struct {
counters map[string]*int64
countersLock sync.RWMutex
averages map[string]*int64
averagesLock sync.RWMutex
}
func (s *Stat) Count(name string) {
s.countersLock.RLock()
counter := s.counters[name]
s.countersLock.RUnlock()
if counter != nil {
atomic.AddInt64(counter, int64(1))
return
}
}
Здесь отсутствует то, как инициализируются сами карты. И пока карты не мутируются. Если имена счетчиков предопределены и не могут быть добавлены позднее, вам не понадобится RWMutex. Этот код может выглядеть примерно так:
type Stat struct {
counters map[string]*int64
}
func InitStat(names... string) Stat {
counters := make(map[string]*int64)
for _, name := range names {
counter := int64(0)
counters[name] = &counter
}
return Stat{counters}
}
func (s *Stat) Count(name string) int64 {
counter := s.counters[name]
if counter == nil {
return -1 // (int64, error) instead?
}
return atomic.AddInt64(counter, 1)
}
(Примечание: я удалил средние значения, потому что он не использовался в исходном примере.)
Теперь, скажем, вы не хотели, чтобы ваши счетчики были предопределены. В этом случае вам понадобится мьютекс для синхронизации доступа.
Давайте попробуем его с помощью Mutex. Это просто, потому что только один поток * может удерживать Lock за раз. Если второй поток * пытается Lock перед первым выпуском их с Unlock, он ждет (или блокирует) ** до тех пор.
type Stat struct {
counters map[string]*int64
mutex sync.Mutex
}
func InitStat() Stat {
return Stat{counters: make(map[string]*int64)}
}
func (s *Stat) Count(name string) int64 {
s.mutex.Lock()
counter := s.counters[name]
if counter == nil {
value := int64(0)
counter = &value
s.counters[name] = counter
}
s.mutex.Unlock()
return atomic.AddInt64(counter, 1)
}
Приведенный выше код будет работать отлично. Но есть две проблемы.
- Если между Lock() и Unlock() есть паника, мьютекс будет заблокирован навсегда, даже если вы должны восстановиться после паники. Этот код, вероятно, не будет паниковать, но в целом лучше использовать его, чтобы он мог.
- При извлечении счетчика выполняется исключительная блокировка. Только один поток * может считывать со счетчика за один раз.
Проблема №1 легко решить. Используйте defer:
func (s *Stat) Count(name string) int64 {
s.mutex.Lock()
defer s.mutex.Unlock()
counter := s.counters[name]
if counter == nil {
value := int64(0)
counter = &value
s.counters[name] = counter
}
return atomic.AddInt64(counter, 1)
}
Это гарантирует, что Unlock() всегда вызывается. И если по какой-то причине у вас есть больше одного возврата, вам нужно только указать Unlock() один раз во главе функции.
Проблема №2 может быть решена с помощью RWMutex. Как это работает, и почему оно полезно?
RWMutex является расширением Mutex и добавляет два метода: RLock и RUnlock. Есть несколько моментов, которые важно отметить RWMutex:
-
RLock является общей блокировкой чтения. Когда с ним фиксируется замок, другие потоки * также могут использовать свой собственный замок с RLock. Это означает, что несколько потоков * могут считываться одновременно. Это полуисключительно.
-
Если мьютекс заблокирован, вызов Lock заблокирован **. Если один или несколько читателей имеют блокировку, вы не можете писать.
-
Если мьютекс заблокирован для записи (с Lock), RLock будет блокировать **.
Хороший способ подумать об этом RWMutex - это Mutex с помощью считывателя. RLock увеличивает счетчик, а RUnlock уменьшает его, Вызов Lock будет заблокирован, пока этот счетчик будет > 0.
Возможно, вы думаете: если мое приложение будет читать тяжело, это означает, что писатель может быть заблокирован на неопределенный срок? Нет. Существует еще одно полезное свойство RWMutex:
- Если счетчик чтения > 0 и Lock вызывается, будущие вызовы RLock также будет блокироваться до тех пор, пока существующие читатели не выпустят свои блокировки, автор получил свою блокировку и позже выпустил ее.
Подумайте об этом, так как свет над регистром в продуктовом магазине, где говорится, что кассир открыт или нет. Люди в очереди могут остаться там, и им будет помогать, но новые люди не могут войти в строй. Как только последний оставшийся клиент получает помощь, кассир переходит на перерыв, и этот регистр либо остается закрытым, пока они не вернутся, либо заменены другим кассиром.
Позволяет изменить предыдущий пример RWMutex:
type Stat struct {
counters map[string]*int64
mutex sync.RWMutex
}
func InitStat() Stat {
return Stat{counters: make(map[string]*int64)}
}
func (s *Stat) Count(name string) int64 {
var counter *int64
if counter = getCounter(name); counter == nil {
counter = initCounter(name);
}
return atomic.AddInt64(counter, 1)
}
func (s *Stat) getCounter(name string) *int64 {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.counters[name]
}
func (s *Stat) initCounter(name string) *int64 {
s.mutex.Lock()
defer s.mutex.Unlock()
counter := s.counters[name]
if counter == nil {
value := int64(0)
counter = &value
s.counters[name] = counter
}
return counter
}
С помощью приведенного выше кода я разделил логику на функции getCounter
и initCounter
следующим образом:
- Держите код понятным. Было бы сложно RLock() и Lock() в той же функции.
- Освободите блокировки как можно раньше, используя отложенный.
В приведенном выше коде, в отличие от примера Mutex, вы можете одновременно увеличивать разные счетчики.
Еще одна вещь, которую я хотел отметить, - это все приведенные выше примеры, карта map[string]*int64
содержит указатели на счетчики, а не сами счетчики. Если вы должны были хранить счетчики на карте map[string]int64
, вам нужно будет использовать Mutex без atomic. Этот код будет выглядеть примерно так:
type Stat struct {
counters map[string]int64
mutex sync.Mutex
}
func InitStat() Stat {
return Stat{counters: make(map[string]int64)}
}
func (s *Stat) Count(name string) int64 {
s.mutex.Lock()
defer s.mutex.Unlock()
s.counters[name]++
return s.counters[name]
}
Возможно, вы захотите сделать это, чтобы уменьшить сбор мусора - но это будет иметь значение только в том случае, если у вас есть тысячи счетчиков, - и даже тогда сами счетчики не занимают много места (по сравнению с чем-то вроде байтового буфера).
*
Когда я говорю "поток", я имею в виду "рутину". Нить на других языках - это механизм одновременного запуска одного или нескольких наборов кода. Нить дорогая для создания и срывания. Ходовая процедура построена поверх потоков, но повторно использует их. Когда рутинная спячка спит, основной поток может использоваться другой подпрограммой. Когда рутина просыпается, она может быть в другом потоке. Go обрабатывает все это за кулисами. - Но для всех целей и задач вы будете рассматривать рутину как поток, когда дело доходит до доступа к памяти. Тем не менее, вам не обязательно быть таким же консервативным при использовании go-подпрограмм, как и потоки.
**
Когда ходовая процедура блокируется Lock
, RLock
, каналом или Sleep, базовая нить может быть повторно использована. Никакой процессор не используется этой рутиной - думайте об этом как о ожидании в очереди. Как и другие языки, бесконечный цикл, такой как for {}
, блокируется, сохраняя процессор и занятый занятый, подумайте об этом, когда бегаете по кругу - вы получите головокружение, бросьте, а окружающих вас людей не будет очень счастлив.