Можно ли мультиплексировать несколько каналов в один?
Идея состоит в том, чтобы иметь переменное количество каналов в срезе, нажимать каждое полученное через них значение в один канал и закрывать этот выходной канал после закрытия последнего из входных каналов. Что-то вроде этого, но для нескольких каналов больше двух:
func multiplex(cin1, cin2, cout chan int) {
n := 2
for {
select {
case v, ok := <-cin1:
if ok {
cout <- v
} else {
n -= 1
}
case v, ok := <-cin2:
if ok {
cout <- v
} else {
n -= 1
}
}
if n == 0 {
close(cout)
break
}
}
}
Вышеприведенный код избегает цикла занятости, так как нет случая default
, что хорошо (EDIT: похоже, что наличие ", ok" делает неактивное действие select, и цикл занят в конце концов. для примера, подумайте о коде, как будто он блокирует). Может ли такая же функциональность быть достигнута и с произвольным количеством входных каналов? Очевидно, что это можно было бы сделать, уменьшив разрез попарно на один канал, но я был бы более заинтересован в более простом решении, если это возможно.
Ответы
Ответ 1
Я считаю, что этот фрагмент делает то, что вы ищете. Я изменил подпись так, чтобы она поняла, что входы и выходы должны использоваться только для связи в одном направлении. Обратите внимание на добавление sync.WaitGroup
, вам нужен какой-то способ для всех входов, чтобы сигнализировать, что они были завершены, и это довольно просто.
func combine(inputs []<-chan int, output chan<- int) {
var group sync.WaitGroup
for i := range inputs {
group.Add(1)
go func(input <-chan int) {
for val := range input {
output <- val
}
group.Done()
} (inputs[i])
}
go func() {
group.Wait()
close(output)
} ()
}
Ответ 2
Изменить: добавлен примерный код примера сокращения и переупорядоченные части ответа.
Предпочтительным решением является отказ от "реструктуризации", так что у вас нет куска каналов ". В реструктуризации часто может использоваться функция, которую несколько goroutines могут отправлять на один канал. Поэтому вместо того, чтобы каждый из ваших источников отправлялся по отдельным каналам, а затем приходилось иметь дело с получением из нескольких каналов, просто создайте один канал и позвольте всем источникам отправлять по этому каналу.
Go не предлагает функцию для приема из куска каналов. Это часто задаваемый вопрос, и хотя предпочтительное решение является предпочтительным, есть способы его программирования. Решение, которое, как я думал, вы предлагаете в своем первоначальном вопросе, говоря, что "сокращение разреза в паре" является решением двоичного разрыва и покорения. Это работает отлично, если у вас есть решение для мультиплексирования двух каналов в одно. Ваш примерный код для этого очень близок к работе.
Вам просто не хватает одного маленького трюка, чтобы ваш примерный код работал. Где вы уменьшаете n, добавьте строку, чтобы установить переменную канала в nil. Например, я прочитал код
case v, ok := <-cin1:
if ok {
cout <- v
} else {
n--
cin1 = nil
}
case v, ok := <-cin2:
if ok {
cout <- v
} else {
n--
cin2 = nil
}
}
Это решение делает то, что вы хотите, и не ожидание.
Итак, полный пример, включающий это решение в функцию, которая мультиплексирует срез:
package main
import (
"fmt"
"time"
)
func multiplex(cin []chan int, cout chan int) {
var cin0, cin1 chan int
switch len(cin) {
case 2:
cin1 = cin[1]
fallthrough
case 1:
cin0 = cin[0]
case 0:
default:
cin0 = make(chan int)
cin1 = make(chan int)
half := len(cin) / 2
go multiplex(cin[:half], cin0)
go multiplex(cin[half:], cin1)
}
for cin0 != nil || cin1 != nil {
select {
case v, ok := <-cin0:
if ok {
cout <- v
} else {
cin0 = nil
}
case v, ok := <-cin1:
if ok {
cout <- v
} else {
cin1 = nil
}
}
}
close(cout)
}
func main() {
cin := []chan int{
make(chan int),
make(chan int),
make(chan int),
}
cout := make(chan int)
for i, c := range cin {
go func(x int, cx chan int) {
for i := 1; i <= 3; i++ {
time.Sleep(100 * time.Millisecond)
cx <- x*10 + i
}
close(cx)
}(i, c)
}
go multiplex(cin, cout)
for {
select {
case v, ok := <-cout:
if ok {
fmt.Println("main gets", v)
} else {
return
}
}
}
}
Ответ 3
Используя goroutines, я произвел это. Это то, что вы хотите?
package main
import (
"fmt"
)
func multiplex(cin []chan int, cout chan int) {
n := len(cin)
for _, ch := range cin {
go func(src chan int) {
for {
v, ok := <-src
if ok {
cout <- v
} else {
n-- // a little dangerous. Maybe use a channel to avoid missed decrements
if n == 0 {
close(cout)
}
break
}
}
}(ch)
}
}
// a main to test the multiplex
func main() {
cin := make([]chan int, 3)
cin[0] = make(chan int, 2)
cin[1] = make(chan int, 2)
cin[2] = make(chan int, 2)
cout := make(chan int, 2)
multiplex(cin, cout)
cin[1] <- 1
cin[0] <- 2
cin[2] <- 3
cin[1] <- 4
cin[0] <- 5
close(cin[1])
close(cin[0])
close(cin[2])
for {
v, ok := <-cout
if ok {
fmt.Println(v)
} else {
break
}
}
}
EDIT: Ссылки:
http://golang.org/ref/spec#Receive_operator
http://golang.org/ref/spec#Close