Как правильно использовать sync.Cond?
Мне трудно понять, как правильно использовать sync.Cond
. Из того, что я могу сказать, существует условие гонки между блокировкой Locker и вызовом метода Wait условия. Этот пример добавляет искусственную задержку между двумя строками в главной горутине для имитации состояния гонки:
package main
import (
"sync"
"time"
)
func main() {
m := sync.Mutex{}
c := sync.NewCond(&m)
go func() {
time.Sleep(1 * time.Second)
c.Broadcast()
}()
m.Lock()
time.Sleep(2 * time.Second)
c.Wait()
}
[Run on the Go Playground]
Это вызывает немедленную панику:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Syncsemacquire(0x10330208, 0x1)
/usr/local/go/src/runtime/sema.go:241 +0x2e0
sync.(*Cond).Wait(0x10330200, 0x0)
/usr/local/go/src/sync/cond.go:63 +0xe0
main.main()
/tmp/sandbox301865429/main.go:17 +0x1a0
Что я делаю неправильно? Как избежать этого очевидного состояния гонки? Есть ли лучшая конструкция синхронизации, которую я должен использовать?
Изменить: Я понимаю, что мне лучше объяснить проблему, которую я пытаюсь решить здесь. У меня есть длинный goroutine, который загружает большой файл и ряд других goroutines, которым необходим доступ к заголовкам HTTP, когда они доступны. Эта проблема сложнее, чем кажется.
Я не могу использовать каналы, потому что тогда только один goroutine получит значение. И некоторые из других goroutines будут пытаться получить заголовки долго после того, как они уже доступны.
Загружающий goroutine может просто хранить заголовки HTTP в переменной и использовать мьютексы для защиты доступа к ним. Однако это не дает возможность другим гортанам "подождать", чтобы они стали доступными.
Я думал, что оба sync.Mutex
и sync.Cond
вместе могут выполнить эту задачу, но кажется, что это невозможно.
Ответы
Ответ 1
Наконец-то я нашел способ сделать это, и он вообще не включает sync.Cond
- только мьютекс.
type Task struct {
m sync.Mutex
headers http.Header
}
func NewTask() *Task {
t := &Task{}
t.m.Lock()
go func() {
defer t.m.Unlock()
// ...do stuff...
}()
return t
}
func (t *Task) WaitFor() http.Header {
t.m.Lock()
defer t.m.Unlock()
return t.headers
}
Как это работает?
Мьютекс заблокирован в начале задачи, гарантируя, что что-либо, вызывающее WaitFor()
, будет заблокировано. Когда доступны заголовки и мьютексы, разблокированные goroutine, каждый вызов WaitFor()
будет выполняться по одному за раз. Все будущие вызовы (даже после окончания goroutine) не будут иметь проблем с блокировкой мьютекса, так как он всегда будет разблокирован.
Ответ 2
OP ответил на свой вопрос, но напрямую не ответил на исходный вопрос, я опубликую, как правильно использовать sync.Cond
.
Вам действительно не нужно sync.Cond
, если у вас есть один goroutine для каждой записи и чтения - достаточно одного sync.Mutex
для связи между ними. sync.Cond
может быть полезен в ситуациях, когда несколько читателей ожидают, что доступные ресурсы будут доступны.
var sharedRsc = make(map[string]interface{})
func main() {
var wg sync.WaitGroup
wg.Add(2)
m := sync.Mutex{}
c := sync.NewCond(&m)
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc1"])
c.L.Unlock()
wg.Done()
}()
go func() {
// this go routine wait for changes to the sharedRsc
c.L.Lock()
for len(sharedRsc) == 0 {
c.Wait()
}
fmt.Println(sharedRsc["rsc2"])
c.L.Unlock()
wg.Done()
}()
// this one writes changes to sharedRsc
c.L.Lock()
sharedRsc["rsc1"] = "foo"
sharedRsc["rsc2"] = "bar"
c.Broadcast()
c.L.Unlock()
wg.Wait()
}
Игровая площадка
Сказав, что использование каналов по-прежнему является рекомендуемым способом передачи данных, если позволяет ситуация.
Примечание: sync.WaitGroup
здесь используется только для ожидания завершения выполнения goroutines.
Ответ 3
Вы должны убедиться, что c.Broadcast вызывается после вашего звонка в c.Wait. Правильная версия вашей программы будет:
package main
import (
"fmt"
"sync"
)
func main() {
m := &sync.Mutex{}
c := sync.NewCond(m)
m.Lock()
go func() {
m.Lock() // Wait for c.Wait()
c.Broadcast()
m.Unlock()
}()
c.Wait() // Unlocks m
}
https://play.golang.org/p/O1r8v8yW6h
Ответ 4
package main
import (
"fmt"
"sync"
"time"
)
func main() {
m := sync.Mutex{}
m.Lock() // main gouroutine is owner of lock
c := sync.NewCond(&m)
go func() {
m.Lock() // obtain a lock
defer m.Unlock()
fmt.Println("3. goroutine is owner of lock")
time.Sleep(2 * time.Second) // long computing - because you are the owner, you can change state variable(s)
c.Broadcast() // State has been changed, publish it to waiting goroutines
fmt.Println("4. goroutine will release lock soon (deffered Unlock")
}()
fmt.Println("1. main goroutine is owner of lock")
time.Sleep(1 * time.Second) // initialization
fmt.Println("2. main goroutine is still lockek")
c.Wait() // Wait temporarily release a mutex during wating and give opportunity to other goroutines to change the state.
// Because you don't know, whether this is state, that you are waiting for, is usually called in loop.
m.Unlock()
fmt.Println("Done")
}
http://play.golang.org/p/fBBwoL7_pm
Ответ 5
Похоже, вы c.Wait for Broadcast, который никогда не случится с вашими интервалами времени.
С
time.Sleep(3 * time.Second) //Broadcast after any Wait for it
c.Broadcast()
ваш фрагмент, похоже, работает http://play.golang.org/p/OE8aP4i6gY. Я пропущу что-то, что вы пытаетесь достичь?
Ответ 6
Вот практический пример с двумя процедурами го. Они запускаются один за другим, но второй ожидает условия, переданного первым, прежде чем продолжить:
package main
import (
"sync"
"fmt"
"time"
)
func main() {
lock := sync.Mutex{}
lock.Lock()
cond := sync.NewCond(&lock)
waitGroup := sync.WaitGroup{}
waitGroup.Add(2)
go func() {
defer waitGroup.Done()
fmt.Println("First go routine has started and waits for 1 second before broadcasting condition")
time.Sleep(1 * time.Second)
fmt.Println("First go routine broadcasts condition")
cond.Broadcast()
}()
go func() {
defer waitGroup.Done()
fmt.Println("Second go routine has started and is waiting on condition")
cond.Wait()
fmt.Println("Second go routine unlocked by condition broadcast")
}()
fmt.Println("Main go routine starts waiting")
waitGroup.Wait()
fmt.Println("Main go routine ends")
}
Вывод может незначительно отличаться, так как вторая процедура запуска может начаться раньше первой и наоборот:
Main go routine starts waiting
Second go routine has started and is waiting on condition
First go routine has started and waits for 1 second before broadcasting condition
First go routine broadcasts condition
Second go routine unlocked by condition broadcast
Main go routine ends
https://gist.github.com/fracasula/21565ea1cf0c15726ca38736031edc70
Ответ 7
Да, вы можете использовать один канал для передачи заголовка в несколько процедур Go.
headerChan := make(chan http.Header)
go func() { // This routine can be started many times
header := <-headerChan // Wait for header
// Do things with the header
}()
// Feed the header to all waiting go routines
for more := true; more; {
select {
case headerChan <- r.Header:
default: more = false
}
}