Уход за гуслями
Я прохожу через тур, и я чувствую, что у меня довольно хорошее понимание языка, за исключением параллелизма.
На слайде 72 есть упражнение, которое просит читателя распараллеливать искатель веб-страниц (и чтобы он не перекрывал повторы, но я еще не получил его).
Вот что я имею до сих пор:
func Crawl(url string, depth int, fetcher Fetcher, ch chan string) {
if depth <= 0 {
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
ch <- fmt.Sprintln(err)
return
}
ch <- fmt.Sprintf("found: %s %q\n", url, body)
for _, u := range urls {
go Crawl(u, depth-1, fetcher, ch)
}
}
func main() {
ch := make(chan string, 100)
go Crawl("http://golang.org/", 4, fetcher, ch)
for i := range ch {
fmt.Println(i)
}
}
У меня есть вопрос о том, где поставить вызов close(ch)
. Если я поставлю defer close(ch)
где-нибудь в методе Crawl
, то я заканчиваю запись на закрытый канал в одном из порожденных goroutines, так как метод завершит выполнение до того, как порожденные goroutines сделают.
Если я опускаю вызов на close(ch)
, как показано в моем примере кода, программа блокируется после завершения выполнения всех goroutines, но основной поток все еще ожидает канал в цикле for, поскольку канал никогда не был закрыт.
Ответы
Ответ 1
Взгляд на раздел "Параллелизация" Effective Go приводит к идеям решения. В основном вы должны закрыть канал на каждом пути возврата функции. На самом деле это хороший вариант использования оператора defer:
func Crawl(url string, depth int, fetcher Fetcher, ret chan string) {
defer close(ret)
if depth <= 0 {
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
ret <- err.Error()
return
}
ret <- fmt.Sprintf("found: %s %q", url, body)
result := make([]chan string, len(urls))
for i, u := range urls {
result[i] = make(chan string)
go Crawl(u, depth-1, fetcher, result[i])
}
for i := range result {
for s := range result[i] {
ret <- s
}
}
return
}
func main() {
result := make(chan string)
go Crawl("http://golang.org/", 4, fetcher, result)
for s := range result {
fmt.Println(s)
}
}
Существенное отличие от вашего кода заключается в том, что каждый экземпляр Crawl получает собственный канал возврата, а функция вызывающего абонента собирает результаты в обратном канале.
Ответ 2
Я пошел с совершенно другим направлением с этим. Я мог бы ввести в заблуждение совет об использовании карты.
// SafeUrlMap is safe to use concurrently.
type SafeUrlMap struct {
v map[string]string
mux sync.Mutex
}
func (c *SafeUrlMap) Set(key string, body string) {
c.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
c.v[key] = body
c.mux.Unlock()
}
// Value returns mapped value for the given key.
func (c *SafeUrlMap) Value(key string) (string, bool) {
c.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
defer c.mux.Unlock()
val, ok := c.v[key]
return val, ok
}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, urlMap SafeUrlMap) {
defer wg.Done()
urlMap.Set(url, body)
if depth <= 0 {
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
for _, u := range urls {
if _, ok := urlMap.Value(u); !ok {
wg.Add(1)
go Crawl(u, depth-1, fetcher, urlMap)
}
}
return
}
var wg sync.WaitGroup
func main() {
urlMap := SafeUrlMap{v: make(map[string]string)}
wg.Add(1)
go Crawl("http://golang.org/", 4, fetcher, urlMap)
wg.Wait()
for url := range urlMap.v {
body, _ := urlMap.Value(url)
fmt.Printf("found: %s %q\n", url, body)
}
}
Ответ 3
O (1) поиск по времени url на карте вместо O (n) поиска на фрагменте всех посещенных URL-адресов должен помочь минимизировать время, проведенное внутри критического раздела, что представляет собой тривиальное количество времени для этого примера, но станет актуальным с масштабом,
WaitGroup, используемая для предотвращения возврата функции верхнего уровня Crawl() до тех пор, пока все подпрограммы для дочерних процессов не будут завершены.
func Crawl(url string, depth int, fetcher Fetcher) {
var str_map = make(map[string]bool)
var mux sync.Mutex
var wg sync.WaitGroup
var crawler func(string,int)
crawler = func(url string, depth int) {
defer wg.Done()
if depth <= 0 {
return
}
mux.Lock()
if _, ok := str_map[url]; ok {
mux.Unlock()
return;
}else{
str_map[url] = true
mux.Unlock()
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q %q\n", url, body, urls)
for _, u := range urls {
wg.Add(1)
go crawler(u, depth-1)
}
}
wg.Add(1)
crawler(url,depth)
wg.Wait()
}
func main() {
Crawl("http://golang.org/", 4, fetcher)
}
Ответ 4
Вот мое решение. У меня есть "основная" процедура, которая прослушивает канал URL-адресов и запускает новую процедуру обхода (которая отправляет обходные URL-адреса в канал), если она находит новые URL-адреса для сканирования.
Вместо явного закрытия канала у меня есть счетчик для незавершенных обходных goroutines, а когда счетчик равен 0, программа выходит, потому что ей нечего ждать.
func doCrawl(url string, fetcher Fetcher, results chan []string) {
body, urls, err := fetcher.Fetch(url)
results <- urls
if err != nil {
fmt.Println(err)
} else {
fmt.Printf("found: %s %q\n", url, body)
}
}
func Crawl(url string, depth int, fetcher Fetcher) {
results := make(chan []string)
crawled := make(map[string]bool)
go doCrawl(url, fetcher, results)
// counter for unfinished crawling goroutines
toWait := 1
for urls := range results {
toWait--
for _, u := range urls {
if !crawled[u] {
crawled[u] = true
go doCrawl(u, fetcher, results)
toWait++
}
}
if toWait == 0 {
break
}
}
}
Ответ 5
Я реализовал его с помощью простого канала, где все горуты отправляют свои сообщения. Чтобы убедиться, что он закрыт, когда больше нет goroutines, я использую безопасный счетчик, который закрывает канал, когда счетчик равен 0.
type Msg struct {
url string
body string
}
type SafeCounter struct {
v int
mux sync.Mutex
}
func (c *SafeCounter) inc() {
c.mux.Lock()
defer c.mux.Unlock()
c.v++
}
func (c *SafeCounter) dec(ch chan Msg) {
c.mux.Lock()
defer c.mux.Unlock()
c.v--
if c.v == 0 {
close(ch)
}
}
var goes SafeCounter = SafeCounter{v: 0}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, ch chan Msg) {
defer goes.dec(ch)
// TODO: Fetch URLs in parallel.
// TODO: Don't fetch the same URL twice.
// This implementation doesn't do either:
if depth <= 0 {
return
}
if !cache.existsAndRegister(url) {
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
ch <- Msg{url, body}
for _, u := range urls {
goes.inc()
go Crawl(u, depth-1, fetcher, ch)
}
}
return
}
func main() {
ch := make(chan Msg, 100)
goes.inc()
go Crawl("http://golang.org/", 4, fetcher, ch)
for m := range ch {
fmt.Printf("found: %s %q\n", m.url, m.body)
}
}
Обратите внимание, что безопасный счетчик должен быть увеличен за пределами goroutine.
Ответ 6
Я использую срез, чтобы избежать обхода URL дважды, рекурсивная версия без параллелизма в порядке, но не уверен в этой версии параллелизма.
func Crawl(url string, depth int, fetcher Fetcher) {
var str_arrs []string
var mux sync.Mutex
var crawl func(string, int)
crawl = func(url string, depth int) {
if depth <= 0 {
return
}
mux.Lock()
for _, v := range str_arrs {
if url == v {
mux.Unlock()
return
}
}
str_arrs = append(str_arrs, url)
mux.Unlock()
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q\n", url, body)
for _, u := range urls {
go crawl(u, depth-1) // could delete "go" then it is recursive
}
}
crawl(url, depth)
return
}
func main() {
Crawl("http://golang.org/", 4, fetcher)
}
Ответ 7
Здесь мое решение, используя sync.WaitGroup и SafeCache из выбранных URL-адресов:
package main
import (
"fmt"
"sync"
)
type Fetcher interface {
// Fetch returns the body of URL and
// a slice of URLs found on that page.
Fetch(url string) (body string, urls []string, err error)
}
// Safe to use concurrently
type SafeCache struct {
fetched map[string]string
mux sync.Mutex
}
func (c *SafeCache) Add(url, body string) {
c.mux.Lock()
defer c.mux.Unlock()
if _, ok := c.fetched[url]; !ok {
c.fetched[url] = body
}
}
func (c *SafeCache) Contains(url string) bool {
c.mux.Lock()
defer c.mux.Unlock()
_, ok := c.fetched[url]
return ok
}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, cache SafeCache,
wg *sync.WaitGroup) {
defer wg.Done()
if depth <= 0 {
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q\n", url, body)
cache.Add(url, body)
for _, u := range urls {
if !cache.Contains(u) {
wg.Add(1)
go Crawl(u, depth-1, fetcher, cache, wg)
}
}
return
}
func main() {
cache := SafeCache{fetched: make(map[string]string)}
var wg sync.WaitGroup
wg.Add(1)
Crawl("http://golang.org/", 4, fetcher, cache, &wg)
wg.Wait()
}
Ответ 8
Ниже приведено простое решение для распараллеливания с использованием только синхронизации waitGroup.
var fetchedUrlMap = make(map[string]bool)
var mutex sync.Mutex
func Crawl(url string, depth int, fetcher Fetcher) {
//fmt.Println("In Crawl2 with url" , url)
if _, ok := fetchedUrlMap[url]; ok {
return
}
if depth <= 0 {
return
}
body, urls, err := fetcher.Fetch(url)
mutex.Lock()
fetchedUrlMap[url] = true
mutex.Unlock()
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q\n", url, body)
var wg sync.WaitGroup
for _, u := range urls {
// fmt.Println("Solving for ", u)
wg.Add(1)
go func(uv string) {
Crawl(uv, depth-1, fetcher)
wg.Done()
}(u)
}
wg.Wait()
}
Ответ 9
Идея, аналогичная принятому ответу, но без извлечения повторяющихся URL-адресов и печати непосредственно на консоль. defer() тоже не используется. Мы используем каналы, чтобы сигнализировать, когда горутины завершены. Идея SafeMap снята с SafeCounter, приведенного ранее в туре.
Для дочерних программ мы создаем массив каналов и ожидаем возвращения каждого дочернего элемента, ожидая на канале.
package main
import (
"fmt"
"sync"
)
// SafeMap is safe to use concurrently.
type SafeMap struct {
v map[string] bool
mux sync.Mutex
}
// SetVal sets the value for the given key.
func (m *SafeMap) SetVal(key string, val bool) {
m.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
m.v[key] = val
m.mux.Unlock()
}
// Value returns the current value of the counter for the given key.
func (m *SafeMap) GetVal(key string) bool {
m.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
defer m.mux.Unlock()
return m.v[key]
}
type Fetcher interface {
// Fetch returns the body of URL and
// a slice of URLs found on that page.
Fetch(url string) (body string, urls []string, err error)
}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher, status chan bool, urlMap SafeMap) {
// Check if we fetched this url previously.
if ok := urlMap.GetVal(url); ok {
//fmt.Println("Already fetched url!")
status <- true
return
}
// Marking this url as fetched already.
urlMap.SetVal(url, true)
if depth <= 0 {
status <- false
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
status <- false
return
}
fmt.Printf("found: %s %q\n", url, body)
statuses := make ([]chan bool, len(urls))
for index, u := range urls {
statuses[index] = make (chan bool)
go Crawl(u, depth-1, fetcher, statuses[index], urlMap)
}
// Wait for child goroutines.
for _, childstatus := range(statuses) {
<- childstatus
}
// And now this goroutine can finish.
status <- true
return
}
func main() {
urlMap := SafeMap{v: make(map[string] bool)}
status := make(chan bool)
go Crawl("https://golang.org/", 4, fetcher, status, urlMap)
<- status
}
Ответ 10
Ниже мое решение. Кроме глобальной карты, мне осталось только изменить содержимое Crawl
. Как и другие решения, я использовал sync.Map
и sync.WaitGroup
. Я заблокировал важные части.
var m sync.Map
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher) {
// This implementation doesn't do either:
if depth <= 0 {
return
}
// Don't fetch the same URL twice.
/////////////////////////////////////
_, ok := m.LoadOrStore(url, url) //
if ok { //
return //
} //
/////////////////////////////////////
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q\n", url, body)
// Fetch URLs in parallel.
/////////////////////////////////////
var wg sync.WaitGroup //
defer wg.Wait() //
for _, u := range urls { //
wg.Add(1) //
go func(u string) { //
defer wg.Done() //
Crawl(u, depth-1, fetcher) //
}(u) //
} //
/////////////////////////////////////
return
}
Ответ 11
Я думаю, что использование карты (так же, как мы могли бы использовать набор в других языках) и мьютекс - это самый простой подход:
func Crawl(url string, depth int, fetcher Fetcher) {
mux.Lock()
defer mux.Unlock()
if depth <= 0 || IsVisited(url) {
return
}
visit[url] = true
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q\n", url, body)
for _, u := range urls {
//
go Crawl(u, depth-1, fetcher)
}
return
}
func IsVisited(s string) bool {
_, ok := visit[s]
return ok
}
var mux sync.Mutex
var visit = make(map[string]bool)
func main() {
Crawl("https://golang.org/", 4, fetcher)
time.Sleep(time.Second)
}