Go 語言 示例: 并發(fā)的Web爬蟲

2023-03-14 16:57 更新

原文鏈接:https://gopl-zh.github.io/ch8/ch8-06.html


8.6. 示例: 并發(fā)的Web爬蟲

在5.6節(jié)中,我們做了一個簡單的web爬蟲,用bfs(廣度優(yōu)先)算法來抓取整個網(wǎng)站。在本節(jié)中,我們會讓這個爬蟲并行化,這樣每一個彼此獨立的抓取命令可以并行進行IO,最大化利用網(wǎng)絡(luò)資源。crawl函數(shù)和gopl.io/ch5/findlinks3中的是一樣的。

gopl.io/ch8/crawl1

func crawl(url string) []string {
    fmt.Println(url)
    list, err := links.Extract(url)
    if err != nil {
        log.Print(err)
    }
    return list
}

主函數(shù)和5.6節(jié)中的breadthFirst(廣度優(yōu)先)類似。像之前一樣,一個worklist是一個記錄了需要處理的元素的隊列,每一個元素都是一個需要抓取的URL列表,不過這一次我們用channel代替slice來做這個隊列。每一個對crawl的調(diào)用都會在他們自己的goroutine中進行并且會把他們抓到的鏈接發(fā)送回worklist。

func main() {
    worklist := make(chan []string)

    // Start with the command-line arguments.
    go func() { worklist <- os.Args[1:] }()

    // Crawl the web concurrently.
    seen := make(map[string]bool)
    for list := range worklist {
        for _, link := range list {
            if !seen[link] {
                seen[link] = true
                go func(link string) {
                    worklist <- crawl(link)
                }(link)
            }
        }
    }
}

注意這里的crawl所在的goroutine會將link作為一個顯式的參數(shù)傳入,來避免“循環(huán)變量快照”的問題(在5.6.1中有講解)。另外注意這里將命令行參數(shù)傳入worklist也是在一個另外的goroutine中進行的,這是為了避免channel兩端的main goroutine與crawler goroutine都嘗試向?qū)Ψ桨l(fā)送內(nèi)容,卻沒有一端接收內(nèi)容時發(fā)生死鎖。當(dāng)然,這里我們也可以用buffered channel來解決問題,這里不再贅述。

現(xiàn)在爬蟲可以高并發(fā)地運行起來,并且可以產(chǎn)生一大坨的URL了,不過還是會有倆問題。一個問題是在運行一段時間后可能會出現(xiàn)在log的錯誤信息里的:

$ go build gopl.io/ch8/crawl1
$ ./crawl1 http://gopl.io/
http://gopl.io/
https://golang.org/help/
https://golang.org/doc/
https://golang.org/blog/
...
2015/07/15 18:22:12 Get ...: dial tcp: lookup blog.golang.org: no such host
2015/07/15 18:22:12 Get ...: dial tcp 23.21.222.120:443: socket: too many open files
...

最初的錯誤信息是一個讓人莫名的DNS查找失敗,即使這個域名是完全可靠的。而隨后的錯誤信息揭示了原因:這個程序一次性創(chuàng)建了太多網(wǎng)絡(luò)連接,超過了每一個進程的打開文件數(shù)限制,既而導(dǎo)致了在調(diào)用net.Dial像DNS查找失敗這樣的問題。

這個程序?qū)嵲谑翘麐尣⑿辛恕o窮無盡地并行化并不是什么好事情,因為不管怎么說,你的系統(tǒng)總是會有一些個限制因素,比如CPU核心數(shù)會限制你的計算負載,比如你的硬盤轉(zhuǎn)軸和磁頭數(shù)限制了你的本地磁盤IO操作頻率,比如你的網(wǎng)絡(luò)帶寬限制了你的下載速度上限,或者是你的一個web服務(wù)的服務(wù)容量上限等等。為了解決這個問題,我們可以限制并發(fā)程序所使用的資源來使之適應(yīng)自己的運行環(huán)境。對于我們的例子來說,最簡單的方法就是限制對links.Extract在同一時間最多不會有超過n次調(diào)用,這里的n一般小于文件描述符的上限值,比如20。這和一個夜店里限制客人數(shù)目是一個道理,只有當(dāng)有客人離開時,才會允許新的客人進入店內(nèi)。

我們可以用一個有容量限制的buffered channel來控制并發(fā),這類似于操作系統(tǒng)里的計數(shù)信號量概念。從概念上講,channel里的n個空槽代表n個可以處理內(nèi)容的token(通行證),從channel里接收一個值會釋放其中的一個token,并且生成一個新的空槽位。這樣保證了在沒有接收介入時最多有n個發(fā)送操作。(這里可能我們拿channel里填充的槽來做token更直觀一些,不過還是這樣吧。)由于channel里的元素類型并不重要,我們用一個零值的struct{}來作為其元素。

讓我們重寫crawl函數(shù),將對links.Extract的調(diào)用操作用獲取、釋放token的操作包裹起來,來確保同一時間對其只有20個調(diào)用。信號量數(shù)量和其能操作的IO資源數(shù)量應(yīng)保持接近。

gopl.io/ch8/crawl2

// tokens is a counting semaphore used to
// enforce a limit of 20 concurrent requests.
var tokens = make(chan struct{}, 20)

func crawl(url string) []string {
    fmt.Println(url)
    tokens <- struct{}{} // acquire a token
    list, err := links.Extract(url)
    <-tokens // release the token
    if err != nil {
        log.Print(err)
    }
    return list
}

第二個問題是這個程序永遠都不會終止,即使它已經(jīng)爬到了所有初始鏈接衍生出的鏈接。(當(dāng)然,除非你慎重地選擇了合適的初始化URL或者已經(jīng)實現(xiàn)了練習(xí)8.6中的深度限制,你應(yīng)該還沒有意識到這個問題。)為了使這個程序能夠終止,我們需要在worklist為空或者沒有crawl的goroutine在運行時退出主循環(huán)。

func main() {
    worklist := make(chan []string)
    var n int // number of pending sends to worklist

    // Start with the command-line arguments.
    n++
    go func() { worklist <- os.Args[1:] }()

    // Crawl the web concurrently.
    seen := make(map[string]bool)

    for ; n > 0; n-- {
        list := <-worklist
        for _, link := range list {
            if !seen[link] {
                seen[link] = true
                n++
                go func(link string) {
                    worklist <- crawl(link)
                }(link)
            }
        }
    }
}

這個版本中,計數(shù)器n對worklist的發(fā)送操作數(shù)量進行了限制。每一次我們發(fā)現(xiàn)有元素需要被發(fā)送到worklist時,我們都會對n進行++操作,在向worklist中發(fā)送初始的命令行參數(shù)之前,我們也進行過一次++操作。這里的操作++是在每啟動一個crawler的goroutine之前。主循環(huán)會在n減為0時終止,這時候說明沒活可干了。

現(xiàn)在這個并發(fā)爬蟲會比5.6節(jié)中的深度優(yōu)先搜索版快上20倍,而且不會出什么錯,并且在其完成任務(wù)時也會正確地終止。

下面的程序是避免過度并發(fā)的另一種思路。這個版本使用了原來的crawl函數(shù),但沒有使用計數(shù)信號量,取而代之用了20個常駐的crawler goroutine,這樣來保證最多20個HTTP請求在并發(fā)。

func main() {
    worklist := make(chan []string)  // lists of URLs, may have duplicates
    unseenLinks := make(chan string) // de-duplicated URLs

    // Add command-line arguments to worklist.
    go func() { worklist <- os.Args[1:] }()

    // Create 20 crawler goroutines to fetch each unseen link.
    for i := 0; i < 20; i++ {
        go func() {
            for link := range unseenLinks {
                foundLinks := crawl(link)
                go func() { worklist <- foundLinks }()
            }
        }()
    }

    // The main goroutine de-duplicates worklist items
    // and sends the unseen ones to the crawlers.
    seen := make(map[string]bool)
    for list := range worklist {
        for _, link := range list {
            if !seen[link] {
                seen[link] = true
                unseenLinks <- link
            }
        }
    }
}

所有的爬蟲goroutine現(xiàn)在都是被同一個channel - unseenLinks喂飽的了。主goroutine負責(zé)拆分它從worklist里拿到的元素,然后把沒有抓過的經(jīng)由unseenLinks channel發(fā)送給一個爬蟲的goroutine。

seen這個map被限定在main goroutine中;也就是說這個map只能在main goroutine中進行訪問。類似于其它的信息隱藏方式,這樣的約束可以讓我們從一定程度上保證程序的正確性。例如,內(nèi)部變量不能夠在函數(shù)外部被訪問到;變量(§2.3.4)在沒有發(fā)生變量逃逸(譯注:局部變量被全局變量引用地址導(dǎo)致變量被分配在堆上)的情況下是無法在函數(shù)外部訪問的;一個對象的封裝字段無法被該對象的方法以外的方法訪問到。在所有的情況下,信息隱藏都可以幫助我們約束我們的程序,使其不發(fā)生意料之外的情況。

crawl函數(shù)爬到的鏈接在一個專有的goroutine中被發(fā)送到worklist中來避免死鎖。為了節(jié)省篇幅,這個例子的終止問題我們先不進行詳細闡述了。

練習(xí) 8.6: 為并發(fā)爬蟲增加深度限制。也就是說,如果用戶設(shè)置了depth=3,那么只有從首頁跳轉(zhuǎn)三次以內(nèi)能夠跳到的頁面才能被抓取到。

練習(xí) 8.7: 完成一個并發(fā)程序來創(chuàng)建一個線上網(wǎng)站的本地鏡像,把該站點的所有可達的頁面都抓取到本地硬盤。為了省事,我們這里可以只取出現(xiàn)在該域下的所有頁面(比如golang.org開頭,譯注:外鏈的應(yīng)該就不算了。)當(dāng)然了,出現(xiàn)在頁面里的鏈接你也需要進行一些處理,使其能夠在你的鏡像站點上進行跳轉(zhuǎn),而不是指向原始的鏈接。

譯注: 拓展閱讀 Handling 1 Million Requests per Minute with Go。



以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號