Как слушать N каналов? (оператор динамического выбора)
чтобы запустить бесконечный цикл выполнения двух подпрограмм, я могу использовать код ниже:
после получения сообщения оно запустит новую программу и продолжится навсегда.
c1 := make(chan string)
c2 := make(chan string)
go DoStuff(c1, 5)
go DoStuff(c2, 2)
for ; true; {
select {
case msg1 := <-c1:
fmt.Println("received ", msg1)
go DoStuff(c1, 1)
case msg2 := <-c2:
fmt.Println("received ", msg2)
go DoStuff(c2, 9)
}
}
Теперь я хотел бы иметь такое же поведение для N подпрограмм, но как будет выглядеть оператор select в этом случае?
Это бит кода, с которого я начал, но я не понимаю, как кодировать оператор select
numChans := 2
//I keep the channels in this slice, and want to "loop" over them in the select statemnt
var chans = [] chan string{}
for i:=0;i<numChans;i++{
tmp := make(chan string);
chans = append(chans, tmp);
go DoStuff(tmp, i + 1)
//How shall the select statment be coded for this case?
for ; true; {
select {
case msg1 := <-c1:
fmt.Println("received ", msg1)
go DoStuff(c1, 1)
case msg2 := <-c2:
fmt.Println("received ", msg2)
go DoStuff(c2, 9)
}
}
Ответы
Ответ 1
Вы можете сделать это, используя функцию Select
из пакетаlect:
func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)
Выбор выполняет операцию выбора, описанную в списке дел. Как и оператор Go select, он блокируется до тех пор, пока не может продолжаться хотя бы один из случаев, делает равномерный псевдослучайный выбор, а затем выполняет этот случай. Он возвращает индекс выбранного случая и, если этот случай был операцией приема, полученное значение и логическое значение, указывающее, соответствует ли значение отправке по каналу (в отличие от нулевого значения, полученного, поскольку канал закрыт).
Вы передаете массив структур SelectCase
которые идентифицируют канал для выбора, направление операции и значение для отправки в случае операции отправки.
Так что вы можете сделать что-то вроде этого:
cases := make([]reflect.SelectCase, len(chans))
for i, ch := range chans {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
chosen, value, ok := reflect.Select(cases)
# ok will be true if the channel has not been closed.
ch := chans[chosen]
msg := value.String()
Вы можете поэкспериментировать с более конкретным примером здесь: http://play.golang.org/p/8zwvSk4kjx
Ответ 2
Вы можете выполнить это, обернув каждый канал в goroutine, который пересылает сообщения на общий "совокупный" канал. Например:
agg := make(chan string)
for _, ch := range chans {
go func(c chan string) {
for msg := range c {
agg <- msg
}
}(ch)
}
select {
case msg <- agg:
fmt.Println("received ", msg)
}
Если вам нужно знать, с какого канала отправлено сообщение, вы можете перенести его в структуру с какой-либо дополнительной информацией, прежде чем пересылать ее в общий канал.
В моем (ограниченном) тестировании этот метод сильно выполняется с использованием пакета отражения:
$ go test dynamic_select_test.go -test.bench=.
...
BenchmarkReflectSelect 1 5265109013 ns/op
BenchmarkGoSelect 20 81911344 ns/op
ok command-line-arguments 9.463s
Код производительности здесь
Ответ 3
Чтобы расширить некоторые комментарии к предыдущим ответам и дать более четкое сравнение, приведен пример обоих подходов, представленных до сих пор с учетом того же ввода, фрагмента каналов для чтения и функции для вызова каждого значения, которое также необходимо чтобы узнать, к какому каналу пришло значение.
Существуют три основных различия между подходами:
- Сложность
. Хотя частично это может быть предпочтение читателя, я считаю, что канальный подход более идиоматичен, прямолинейен и читабель.
- Производительность
. В моей системе Xeon amd64 каналы goroutines + channels выполняют решение отражения примерно на два порядка (в общем отражение в Go часто медленнее и должно использоваться только тогда, когда это абсолютно необходимо). Конечно, если есть какая-либо значительная задержка либо в функции обработки результатов, либо при записи значений на входные каналы, это различие в производительности может стать незначительным.
-
Сегментация блокировки/буферизации. Важность этого зависит от варианта использования. Чаще всего это либо не имеет значения, либо небольшая дополнительная буферизация в решении слияния goroutine может быть полезна для пропускной способности. Однако, если желательно иметь семантику, что только один писатель разблокирован, и он полностью обрабатывается до того, как любой другой писатель будет разблокирован, тогда это может быть достигнуто только с помощью решения отражения.
Примечание. Оба подхода могут быть упрощены, если не требуется "идентификатор" отправляющего канала или если исходные каналы никогда не будут закрыты.
Канал объединения Goroutine:
// Process1 calls `fn` for each value received from any of the `chans`
// channels. The arguments to `fn` are the index of the channel the
// value came from and the string value. Process1 returns once all the
// channels are closed.
func Process1(chans []<-chan string, fn func(int, string)) {
// Setup
type item struct {
int // index of which channel this came from
string // the actual string item
}
merged := make(chan item)
var wg sync.WaitGroup
wg.Add(len(chans))
for i, c := range chans {
go func(i int, c <-chan string) {
// Reads and buffers a single item from `c` before
// we even know if we can write to `merged`.
//
// Go doesn't provide a way to do something like:
// merged <- (<-c)
// atomically, where we delay the read from `c`
// until we can write to `merged`. The read from
// `c` will always happen first (blocking as
// required) and then we block on `merged` (with
// either the above or the below syntax making
// no difference).
for s := range c {
merged <- item{i, s}
}
// If/when this input channel is closed we just stop
// writing to the merged channel and via the WaitGroup
// let it be known there is one fewer channel active.
wg.Done()
}(i, c)
}
// One extra goroutine to watch for all the merging goroutines to
// be finished and then close the merged channel.
go func() {
wg.Wait()
close(merged)
}()
// "select-like" loop
for i := range merged {
// Process each value
fn(i.int, i.string)
}
}
Выбор выделения:
// Process2 is identical to Process1 except that it uses the reflect
// package to select and read from the input channels which guarantees
// there is only one value "in-flight" (i.e. when `fn` is called only
// a single send on a single channel will have succeeded, the rest will
// be blocked). It is approximately two orders of magnitude slower than
// Process1 (which is still insignificant if their is a significant
// delay between incoming values or if `fn` runs for a significant
// time).
func Process2(chans []<-chan string, fn func(int, string)) {
// Setup
cases := make([]reflect.SelectCase, len(chans))
// `ids` maps the index within cases to the original `chans` index.
ids := make([]int, len(chans))
for i, c := range chans {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(c),
}
ids[i] = i
}
// Select loop
for len(cases) > 0 {
// A difference here from the merging goroutines is
// that `v` is the only value "in-flight" that any of
// the workers have sent. All other workers are blocked
// trying to send the single value they have calculated
// where-as the goroutine version reads/buffers a single
// extra value from each worker.
i, v, ok := reflect.Select(cases)
if !ok {
// Channel cases[i] has been closed, remove it
// from our slice of cases and update our ids
// mapping as well.
cases = append(cases[:i], cases[i+1:]...)
ids = append(ids[:i], ids[i+1:]...)
continue
}
// Process each value
fn(ids[i], v.String())
}
}
[Полный код на игровой площадке Go.]
Ответ 4
Почему этот подход не будет работать, если кто-то отправляет события?
func main() {
numChans := 2
var chans = []chan string{}
for i := 0; i < numChans; i++ {
tmp := make(chan string)
chans = append(chans, tmp)
}
for true {
for i, c := range chans {
select {
case x = <-c:
fmt.Printf("received %d \n", i)
go DoShit(x, i)
default: continue
}
}
}
}