如何将对象添加到从该通道接收数据的goroutine的通道中



基本上,我试图使用Goroutines编写一个并发的SiteMap爬网。一个站点地图可以包含指向多个站点地图的链接,这些链接可以包含指向其他站点地点等的链接。

现在,这是我的设计:

worker:
     - receives url from channel
     - processesUrl(url)
processUrl:
     for each link in lookup(url):
         - if link is sitemap:
                channel <- url
           else:
               print(url)
main:
    - create 10 workers
    - chanel <- root url

问题是,在ProcessUrl()完成之前,工人将不会接受来自频道的新URL,并且ProcessUrl直到工人接受新的URL(如果将新URL添加到频道中)才能完成。。我可以使用什么并发设计将URL添加到任务队列的情况下,没有渠道,而无需忙碌或不等待channel <- url

如果有帮助,这是实际代码:

func (c *SitemapCrawler) worker() {
    for {
        select {
        case url := <-urlChan:
            fmt.Println(url)
            c.crawlSitemap(url)
        }
    }
}
func crawlUrl(url string) {
    defer crawlWg.Done()
    crawler := NewCrawler(url)
    for i := 0; i < MaxCrawlRate*20; i++ {
        go crawler.worker()
    }
    crawler.getSitemaps()
    pretty.Println(crawler.sitemaps)
    crawler.crawlSitemaps()
}
func (c SitemapCrawler) crawlSitemap(url string) {
    c.limiter.Take()
    resp, err := MakeRequest(url)
    if err != nil || resp.StatusCode != 200 {
        crawlWg.Done()
        return
    }
    var resp_txt []byte
    if strings.Contains(resp.Header.Get("Content-Type"), "html") {
        crawlWg.Done()
        return
    } else if strings.Contains(url, ".gz") || resp.Header.Get("Content-Encoding") == "gzip" {
        reader, err := gzip.NewReader(resp.Body)
        if err != nil {
            crawlWg.Done()
            panic(err)
        } else {
            resp_txt, err = ioutil.ReadAll(reader)
            if err != nil {
                crawlWg.Done()
                panic(err)
            }
        }
        reader.Close()
    } else {
        resp_txt, err = ioutil.ReadAll(resp.Body)
        if err != nil {
            //panic(err)
            crawlWg.Done()
            return
        }
    }
    io.Copy(ioutil.Discard, resp.Body)
    resp.Body.Close()
    d, err := libxml2.ParseString(string(resp_txt))
    if err != nil {
        crawlWg.Done()
        return
    }
    results, err := d.Find("//*[contains(local-name(), 'loc')]")
    if err != nil {
        crawlWg.Done()
        return
    }
    locs := results.NodeList()
    printLock.Lock()
    for i := 0; i < len(locs); i++ {
        newUrl := locs[i].TextContent()
        if strings.Contains(newUrl, ".xml") {
            crawlWg.Add(1)
            //go c.crawlSitemap(newUrl)
            urlChan <- newUrl
        } else {
            fmt.Println(newUrl)
        }
    }
    printLock.Unlock()
    crawlWg.Done()
}

当通道未缓冲时,将操作写入通道正在阻止。

创建一个缓冲通道:

urlChan := make(chan string, len(allUrls))

此通道已满,但是写操作将再次阻止。

另外,您可以使用开关。当写入"失败"时,它将立即落入默认值

select {
case urlChan <- url:
    fmt.Println("received message")
default:
    fmt.Println("no activity")
}

要暂停到频道的暂停执行以下

select {
case urlChan <- url:
    fmt.Println("received message")
case <-time.After(5 * time.Second):
    fmt.Println("timed out")
}

或最终将写入事件放在单独的GO通道

func write() {
    urlChan <- url
}
go write()

最新更新