原文鏈接:https://chai2010.cn/advanced-go-programming-book/ch1-basic/ch1-06-goroutine.html
Go 語(yǔ)言最吸引人的地方是它內(nèi)建的并發(fā)支持。Go 語(yǔ)言并發(fā)體系的理論是 C.A.R Hoare 在 1978 年提出的 CSP(Communicating Sequential Process,通訊順序進(jìn)程)。CSP 有著精確的數(shù)學(xué)模型,并實(shí)際應(yīng)用在了 Hoare 參與設(shè)計(jì)的 T9000 通用計(jì)算機(jī)上。從 NewSqueak、Alef、Limbo 到現(xiàn)在的 Go 語(yǔ)言,對(duì)于對(duì) CSP 有著 20 多年實(shí)戰(zhàn)經(jīng)驗(yàn)的 Rob Pike 來(lái)說(shuō),他更關(guān)注的是將 CSP 應(yīng)用在通用編程語(yǔ)言上產(chǎn)生的潛力。作為 Go 并發(fā)編程核心的 CSP 理論的核心概念只有一個(gè):同步通信。關(guān)于同步通信的話題我們?cè)谇懊嬉还?jié)已經(jīng)講過(guò),本節(jié)我們將簡(jiǎn)單介紹下 Go 語(yǔ)言中常見(jiàn)的并發(fā)模式。
首先要明確一個(gè)概念:并發(fā)不是并行。并發(fā)更關(guān)注的是程序的設(shè)計(jì)層面,并發(fā)的程序完全是可以順序執(zhí)行的,只有在真正的多核 CPU 上才可能真正地同時(shí)運(yùn)行。并行更關(guān)注的是程序的運(yùn)行層面,并行一般是簡(jiǎn)單的大量重復(fù),例如 GPU 中對(duì)圖像處理都會(huì)有大量的并行運(yùn)算。為更好的編寫(xiě)并發(fā)程序,從設(shè)計(jì)之初 Go 語(yǔ)言就注重如何在編程語(yǔ)言層級(jí)上設(shè)計(jì)一個(gè)簡(jiǎn)潔安全高效的抽象模型,讓程序員專注于分解問(wèn)題和組合方案,而且不用被線程管理和信號(hào)互斥這些繁瑣的操作分散精力。
在并發(fā)編程中,對(duì)共享資源的正確訪問(wèn)需要精確的控制,在目前的絕大多數(shù)語(yǔ)言中,都是通過(guò)加鎖等線程同步方案來(lái)解決這一困難問(wèn)題,而 Go 語(yǔ)言卻另辟蹊徑,它將共享的值通過(guò) Channel 傳遞(實(shí)際上多個(gè)獨(dú)立執(zhí)行的線程很少主動(dòng)共享資源)。在任意給定的時(shí)刻,最好只有一個(gè) Goroutine 能夠擁有該資源。數(shù)據(jù)競(jìng)爭(zhēng)從設(shè)計(jì)層面上就被杜絕了。為了提倡這種思考方式,Go 語(yǔ)言將其并發(fā)編程哲學(xué)化為一句口號(hào):
Do not communicate by sharing memory; instead, share memory by communicating.
不要通過(guò)共享內(nèi)存來(lái)通信,而應(yīng)通過(guò)通信來(lái)共享內(nèi)存。
這是更高層次的并發(fā)編程哲學(xué)(通過(guò)管道來(lái)傳值是 Go 語(yǔ)言推薦的做法)。雖然像引用計(jì)數(shù)這類簡(jiǎn)單的并發(fā)問(wèn)題通過(guò)原子操作或互斥鎖就能很好地實(shí)現(xiàn),但是通過(guò) Channel 來(lái)控制訪問(wèn)能夠讓你寫(xiě)出更簡(jiǎn)潔正確的程序。
我們先以在一個(gè)新的 Goroutine 中輸出“Hello world”,main
等待后臺(tái)線程輸出工作完成之后退出,這樣一個(gè)簡(jiǎn)單的并發(fā)程序作為熱身。
并發(fā)編程的核心概念是同步通信,但是同步的方式卻有多種。我們先以大家熟悉的互斥量 sync.Mutex
來(lái)實(shí)現(xiàn)同步通信。根據(jù)文檔,我們不能直接對(duì)一個(gè)未加鎖狀態(tài)的 sync.Mutex
進(jìn)行解鎖,這會(huì)導(dǎo)致運(yùn)行時(shí)異常。下面這種方式并不能保證正常工作:
func main() {
var mu sync.Mutex
go func(){
fmt.Println("你好, 世界")
mu.Lock()
}()
mu.Unlock()
}
因?yàn)?nbsp;mu.Lock()
和 mu.Unlock()
并不在同一個(gè) Goroutine 中,所以也就不滿足順序一致性內(nèi)存模型。同時(shí)它們也沒(méi)有其它的同步事件可以參考,這兩個(gè)事件不可排序也就是可以并發(fā)的。因?yàn)榭赡苁遣l(fā)的事件,所以 main
函數(shù)中的 mu.Unlock()
很有可能先發(fā)生,而這個(gè)時(shí)刻 mu
互斥對(duì)象還處于未加鎖的狀態(tài),從而會(huì)導(dǎo)致運(yùn)行時(shí)異常。
下面是修復(fù)后的代碼:
func main() {
var mu sync.Mutex
mu.Lock()
go func(){
fmt.Println("你好, 世界")
mu.Unlock()
}()
mu.Lock()
}
修復(fù)的方式是在 main
函數(shù)所在線程中執(zhí)行兩次 mu.Lock()
,當(dāng)?shù)诙渭渔i時(shí)會(huì)因?yàn)殒i已經(jīng)被占用(不是遞歸鎖)而阻塞,main
函數(shù)的阻塞狀態(tài)驅(qū)動(dòng)后臺(tái)線程繼續(xù)向前執(zhí)行。當(dāng)后臺(tái)線程執(zhí)行到 mu.Unlock()
時(shí)解鎖,此時(shí)打印工作已經(jīng)完成了,解鎖會(huì)導(dǎo)致 main
函數(shù)中的第二個(gè) mu.Lock()
阻塞狀態(tài)取消,此時(shí)后臺(tái)線程和主線程再?zèng)]有其它的同步事件參考,它們退出的事件將是并發(fā)的:在 main
函數(shù)退出導(dǎo)致程序退出時(shí),后臺(tái)線程可能已經(jīng)退出了,也可能沒(méi)有退出。雖然無(wú)法確定兩個(gè)線程退出的時(shí)間,但是打印工作是可以正確完成的。
使用 sync.Mutex
互斥鎖同步是比較低級(jí)的做法。我們現(xiàn)在改用無(wú)緩存的管道來(lái)實(shí)現(xiàn)同步:
func main() {
done := make(chan int)
go func(){
fmt.Println("你好, 世界")
<-done
}()
done <- 1
}
根據(jù) Go 語(yǔ)言內(nèi)存模型規(guī)范,對(duì)于從無(wú)緩沖 Channel 進(jìn)行的接收,發(fā)生在對(duì)該 Channel 進(jìn)行的發(fā)送完成之前。因此,后臺(tái)線程 <-done
接收操作完成之后,main
線程的 done <- 1
發(fā)送操作才可能完成(從而退出 main、退出程序),而此時(shí)打印工作已經(jīng)完成了。
上面的代碼雖然可以正確同步,但是對(duì)管道的緩存大小太敏感:如果管道有緩存的話,就無(wú)法保證 main 退出之前后臺(tái)線程能正常打印了。更好的做法是將管道的發(fā)送和接收方向調(diào)換一下,這樣可以避免同步事件受管道緩存大小的影響:
func main() {
done := make(chan int, 1) // 帶緩存的管道
go func(){
fmt.Println("你好, 世界")
done <- 1
}()
<-done
}
對(duì)于帶緩沖的 Channel,對(duì)于 Channel 的第 K 個(gè)接收完成操作發(fā)生在第 K+C 個(gè)發(fā)送操作完成之前,其中 C 是 Channel 的緩存大小。雖然管道是帶緩存的,main
線程接收完成是在后臺(tái)線程發(fā)送開(kāi)始但還未完成的時(shí)刻,此時(shí)打印工作也是已經(jīng)完成的。
基于帶緩存的管道,我們可以很容易將打印線程擴(kuò)展到 N 個(gè)。下面的例子是開(kāi)啟 10 個(gè)后臺(tái)線程分別打?。?
func main() {
done := make(chan int, 10) // 帶 10 個(gè)緩存
// 開(kāi) N 個(gè)后臺(tái)打印線程
for i := 0; i < cap(done); i++ {
go func(){
fmt.Println("你好, 世界")
done <- 1
}()
}
// 等待 N 個(gè)后臺(tái)線程完成
for i := 0; i < cap(done); i++ {
<-done
}
}
對(duì)于這種要等待 N 個(gè)線程完成后再進(jìn)行下一步的同步操作有一個(gè)簡(jiǎn)單的做法,就是使用 sync.WaitGroup
來(lái)等待一組事件:
func main() {
var wg sync.WaitGroup
// 開(kāi) N 個(gè)后臺(tái)打印線程
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
fmt.Println("你好, 世界")
wg.Done()
}()
}
// 等待 N 個(gè)后臺(tái)線程完成
wg.Wait()
}
其中 wg.Add(1)
用于增加等待事件的個(gè)數(shù),必須確保在后臺(tái)線程啟動(dòng)之前執(zhí)行(如果放到后臺(tái)線程之中執(zhí)行則不能保證被正常執(zhí)行到)。當(dāng)后臺(tái)線程完成打印工作之后,調(diào)用 wg.Done()
表示完成一個(gè)事件。main
函數(shù)的 wg.Wait()
是等待全部的事件完成。
并發(fā)編程中最常見(jiàn)的例子就是生產(chǎn)者消費(fèi)者模式,該模式主要通過(guò)平衡生產(chǎn)線程和消費(fèi)線程的工作能力來(lái)提高程序的整體處理數(shù)據(jù)的速度。簡(jiǎn)單地說(shuō),就是生產(chǎn)者生產(chǎn)一些數(shù)據(jù),然后放到成果隊(duì)列中,同時(shí)消費(fèi)者從成果隊(duì)列中來(lái)取這些數(shù)據(jù)。這樣就讓生產(chǎn)消費(fèi)變成了異步的兩個(gè)過(guò)程。當(dāng)成果隊(duì)列中沒(méi)有數(shù)據(jù)時(shí),消費(fèi)者就進(jìn)入饑餓的等待中;而當(dāng)成果隊(duì)列中數(shù)據(jù)已滿時(shí),生產(chǎn)者則面臨因產(chǎn)品擠壓導(dǎo)致 CPU 被剝奪的下崗問(wèn)題。
Go 語(yǔ)言實(shí)現(xiàn)生產(chǎn)者消費(fèi)者并發(fā)很簡(jiǎn)單:
// 生產(chǎn)者: 生成 factor 整數(shù)倍的序列
func Producer(factor int, out chan<- int) {
for i := 0; ; i++ {
out <- i*factor
}
}
// 消費(fèi)者
func Consumer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
ch := make(chan int, 64) // 成果隊(duì)列
go Producer(3, ch) // 生成 3 的倍數(shù)的序列
go Producer(5, ch) // 生成 5 的倍數(shù)的序列
go Consumer(ch) // 消費(fèi)生成的隊(duì)列
// 運(yùn)行一定時(shí)間后退出
time.Sleep(5 * time.Second)
}
我們開(kāi)啟了 2 個(gè) Producer
生產(chǎn)流水線,分別用于生成 3 和 5 的倍數(shù)的序列。然后開(kāi)啟 1 個(gè) Consumer
消費(fèi)者線程,打印獲取的結(jié)果。我們通過(guò)在 main
函數(shù)休眠一定的時(shí)間來(lái)讓生產(chǎn)者和消費(fèi)者工作一定時(shí)間。正如前面一節(jié)說(shuō)的,這種靠休眠方式是無(wú)法保證穩(wěn)定的輸出結(jié)果的。
我們可以讓 main
函數(shù)保存阻塞狀態(tài)不退出,只有當(dāng)用戶輸入 Ctrl-C
時(shí)才真正退出程序:
func main() {
ch := make(chan int, 64) // 成果隊(duì)列
go Producer(3, ch) // 生成 3 的倍數(shù)的序列
go Producer(5, ch) // 生成 5 的倍數(shù)的序列
go Consumer(ch) // 消費(fèi) 生成的隊(duì)列
// Ctrl+C 退出
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
fmt.Printf("quit (%v)\n", <-sig)
}
我們這個(gè)例子中有 2 個(gè)生產(chǎn)者,并且 2 個(gè)生產(chǎn)者之間并無(wú)同步事件可參考,它們是并發(fā)的。因此,消費(fèi)者輸出的結(jié)果序列的順序是不確定的,這并沒(méi)有問(wèn)題,生產(chǎn)者和消費(fèi)者依然可以相互配合工作。
發(fā)布訂閱(publish-and-subscribe)模型通常被簡(jiǎn)寫(xiě)為 pub/sub 模型。在這個(gè)模型中,消息生產(chǎn)者成為發(fā)布者(publisher),而消息消費(fèi)者則成為訂閱者(subscriber),生產(chǎn)者和消費(fèi)者是 M:N 的關(guān)系。在傳統(tǒng)生產(chǎn)者和消費(fèi)者模型中,是將消息發(fā)送到一個(gè)隊(duì)列中,而發(fā)布訂閱模型則是將消息發(fā)布給一個(gè)主題。
為此,我們構(gòu)建了一個(gè)名為 pubsub
的發(fā)布訂閱模型支持包:
// Package pubsub implements a simple multi-topic pub-sub library.
package pubsub
import (
"sync"
"time"
)
type (
subscriber chan interface{} // 訂閱者為一個(gè)管道
topicFunc func(v interface{}) bool // 主題為一個(gè)過(guò)濾器
)
// 發(fā)布者對(duì)象
type Publisher struct {
m sync.RWMutex // 讀寫(xiě)鎖
buffer int // 訂閱隊(duì)列的緩存大小
timeout time.Duration // 發(fā)布超時(shí)時(shí)間
subscribers map[subscriber]topicFunc // 訂閱者信息
}
// 構(gòu)建一個(gè)發(fā)布者對(duì)象, 可以設(shè)置發(fā)布超時(shí)時(shí)間和緩存隊(duì)列的長(zhǎng)度
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
// 添加一個(gè)新的訂閱者,訂閱全部主題
func (p *Publisher) Subscribe() chan interface{} {
return p.SubscribeTopic(nil)
}
// 添加一個(gè)新的訂閱者,訂閱過(guò)濾器篩選后的主題
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// 退出訂閱
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
defer p.m.Unlock()
delete(p.subscribers, sub)
close(sub)
}
// 發(fā)布一個(gè)主題
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
defer p.m.RUnlock()
var wg sync.WaitGroup
for sub, topic := range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, topic, v, &wg)
}
wg.Wait()
}
// 關(guān)閉發(fā)布者對(duì)象,同時(shí)關(guān)閉所有的訂閱者管道。
func (p *Publisher) Close() {
p.m.Lock()
defer p.m.Unlock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
}
// 發(fā)送主題,可以容忍一定的超時(shí)
func (p *Publisher) sendTopic(
sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup,
) {
defer wg.Done()
if topic != nil && !topic(v) {
return
}
select {
case sub <- v:
case <-time.After(p.timeout):
}
}
下面的例子中,有兩個(gè)訂閱者分別訂閱了全部主題和含有"golang"的主題:
import "path/to/pubsub"
func main() {
p := pubsub.NewPublisher(100*time.Millisecond, 10)
defer p.Close()
all := p.Subscribe()
golang := p.SubscribeTopic(func(v interface{}) bool {
if s, ok := v.(string); ok {
return strings.Contains(s, "golang")
}
return false
})
p.Publish("hello, world!")
p.Publish("hello, golang!")
go func() {
for msg := range all {
fmt.Println("all:", msg)
}
} ()
go func() {
for msg := range golang {
fmt.Println("golang:", msg)
}
} ()
// 運(yùn)行一定時(shí)間后退出
time.Sleep(3 * time.Second)
}
在發(fā)布訂閱模型中,每條消息都會(huì)傳送給多個(gè)訂閱者。發(fā)布者通常不會(huì)知道、也不關(guān)心哪一個(gè)訂閱者正在接收主題消息。訂閱者和發(fā)布者可以在運(yùn)行時(shí)動(dòng)態(tài)添加,是一種松散的耦合關(guān)系,這使得系統(tǒng)的復(fù)雜性可以隨時(shí)間的推移而增長(zhǎng)。在現(xiàn)實(shí)生活中,像天氣預(yù)報(bào)之類的應(yīng)用就可以應(yīng)用這個(gè)并發(fā)模式。
很多用戶在適應(yīng)了 Go 語(yǔ)言強(qiáng)大的并發(fā)特性之后,都傾向于編寫(xiě)最大并發(fā)的程序,因?yàn)檫@樣似乎可以提供最大的性能。在現(xiàn)實(shí)中我們行色匆匆,但有時(shí)卻需要我們放慢腳步享受生活,并發(fā)的程序也是一樣:有時(shí)候我們需要適當(dāng)?shù)乜刂撇l(fā)的程度,因?yàn)檫@樣不僅僅可給其它的應(yīng)用/任務(wù)讓出/預(yù)留一定的 CPU 資源,也可以適當(dāng)降低功耗緩解電池的壓力。
在 Go 語(yǔ)言自帶的 godoc 程序?qū)崿F(xiàn)中有一個(gè) vfs
的包對(duì)應(yīng)虛擬的文件系統(tǒng),在 vfs
包下面有一個(gè) gatefs
的子包,gatefs
子包的目的就是為了控制訪問(wèn)該虛擬文件系統(tǒng)的最大并發(fā)數(shù)。gatefs
包的應(yīng)用很簡(jiǎn)單:
import (
"golang.org/x/tools/godoc/vfs"
"golang.org/x/tools/godoc/vfs/gatefs"
)
func main() {
fs := gatefs.New(vfs.OS("/path"), make(chan bool, 8))
// ...
}
其中 vfs.OS("/path")
基于本地文件系統(tǒng)構(gòu)造一個(gè)虛擬的文件系統(tǒng),然后 gatefs.New
基于現(xiàn)有的虛擬文件系統(tǒng)構(gòu)造一個(gè)并發(fā)受控的虛擬文件系統(tǒng)。并發(fā)數(shù)控制的原理在前面一節(jié)已經(jīng)講過(guò),就是通過(guò)帶緩存管道的發(fā)送和接收規(guī)則來(lái)實(shí)現(xiàn)最大并發(fā)阻塞:
var limit = make(chan int, 3)
func main() {
for _, w := range work {
go func() {
limit <- 1
w()
<-limit
}()
}
select{}
}
不過(guò) gatefs
對(duì)此做一個(gè)抽象類型 gate
,增加了 enter
和 leave
方法分別對(duì)應(yīng)并發(fā)代碼的進(jìn)入和離開(kāi)。當(dāng)超出并發(fā)數(shù)目限制的時(shí)候,enter
方法會(huì)阻塞直到并發(fā)數(shù)降下來(lái)為止。
type gate chan bool
func (g gate) enter() { g <- true }
func (g gate) leave() { <-g }
gatefs
包裝的新的虛擬文件系統(tǒng)就是將需要控制并發(fā)的方法增加了 enter
和 leave
調(diào)用而已:
type gatefs struct {
fs vfs.FileSystem
gate
}
func (fs gatefs) Lstat(p string) (os.FileInfo, error) {
fs.enter()
defer fs.leave()
return fs.fs.Lstat(p)
}
我們不僅可以控制最大的并發(fā)數(shù)目,而且可以通過(guò)帶緩存 Channel 的使用量和最大容量比例來(lái)判斷程序運(yùn)行的并發(fā)率。當(dāng)管道為空的時(shí)候可以認(rèn)為是空閑狀態(tài),當(dāng)管道滿了時(shí)任務(wù)是繁忙狀態(tài),這對(duì)于后臺(tái)一些低級(jí)任務(wù)的運(yùn)行是有參考價(jià)值的。
采用并發(fā)編程的動(dòng)機(jī)有很多:并發(fā)編程可以簡(jiǎn)化問(wèn)題,比如一類問(wèn)題對(duì)應(yīng)一個(gè)處理線程會(huì)更簡(jiǎn)單;并發(fā)編程還可以提升性能,在一個(gè)多核 CPU 上開(kāi) 2 個(gè)線程一般會(huì)比開(kāi) 1 個(gè)線程快一些。其實(shí)對(duì)于提升性能而言,程序并不是簡(jiǎn)單地運(yùn)行速度快就表示用戶體驗(yàn)好的;很多時(shí)候程序能快速響應(yīng)用戶請(qǐng)求才是最重要的,當(dāng)沒(méi)有用戶請(qǐng)求需要處理的時(shí)候才合適處理一些低優(yōu)先級(jí)的后臺(tái)任務(wù)。
假設(shè)我們想快速地搜索“golang”相關(guān)的主題,我們可能會(huì)同時(shí)打開(kāi) Bing、Google 或百度等多個(gè)檢索引擎。當(dāng)某個(gè)搜索最先返回結(jié)果后,就可以關(guān)閉其它搜索頁(yè)面了。因?yàn)槭芫W(wǎng)絡(luò)環(huán)境和搜索引擎算法的影響,某些搜索引擎可能很快返回搜索結(jié)果,某些搜索引擎也可能等到他們公司倒閉也沒(méi)有完成搜索。我們可以采用類似的策略來(lái)編寫(xiě)這個(gè)程序:
func main() {
ch := make(chan string, 32)
go func() {
ch <- searchByBing("golang")
}()
go func() {
ch <- searchByGoogle("golang")
}()
go func() {
ch <- searchByBaidu("golang")
}()
fmt.Println(<-ch)
}
首先,我們創(chuàng)建了一個(gè)帶緩存的管道,管道的緩存數(shù)目要足夠大,保證不會(huì)因?yàn)榫彺娴娜萘恳鸩槐匾淖枞H缓笪覀冮_(kāi)啟了多個(gè)后臺(tái)線程,分別向不同的搜索引擎提交搜索請(qǐng)求。當(dāng)任意一個(gè)搜索引擎最先有結(jié)果之后,都會(huì)馬上將結(jié)果發(fā)到管道中(因?yàn)楣艿缼Я俗銐虻木彺?,這個(gè)過(guò)程不會(huì)阻塞)。但是最終我們只從管道取第一個(gè)結(jié)果,也就是最先返回的結(jié)果。
通過(guò)適當(dāng)開(kāi)啟一些冗余的線程,嘗試用不同途徑去解決同樣的問(wèn)題,最終以贏者為王的方式提升了程序的相應(yīng)性能。
在“Hello world 的革命”一節(jié)中,我們?yōu)榱搜菔?Newsqueak 的并發(fā)特性,文中給出了并發(fā)版本素?cái)?shù)篩的實(shí)現(xiàn)。并發(fā)版本的素?cái)?shù)篩是一個(gè)經(jīng)典的并發(fā)例子,通過(guò)它我們可以更深刻地理解 Go 語(yǔ)言的并發(fā)特性?!八?cái)?shù)篩”的原理如圖:
圖 1-13 素?cái)?shù)篩
我們需要先生成最初的 2, 3, 4, ...
自然數(shù)序列(不包含開(kāi)頭的 0、1):
// 返回生成自然數(shù)序列的管道: 2, 3, 4, ...
func GenerateNatural() chan int {
ch := make(chan int)
go func() {
for i := 2; ; i++ {
ch <- i
}
}()
return ch
}
GenerateNatural
函數(shù)內(nèi)部啟動(dòng)一個(gè) Goroutine 生產(chǎn)序列,返回對(duì)應(yīng)的管道。
然后是為每個(gè)素?cái)?shù)構(gòu)造一個(gè)篩子:將輸入序列中是素?cái)?shù)倍數(shù)的數(shù)提出,并返回新的序列,是一個(gè)新的管道。
// 管道過(guò)濾器: 刪除能被素?cái)?shù)整除的數(shù)
func PrimeFilter(in <-chan int, prime int) chan int {
out := make(chan int)
go func() {
for {
if i := <-in; i%prime != 0 {
out <- i
}
}
}()
return out
}
PrimeFilter
函數(shù)也是內(nèi)部啟動(dòng)一個(gè) Goroutine 生產(chǎn)序列,返回過(guò)濾后序列對(duì)應(yīng)的管道。
現(xiàn)在我們可以在 main
函數(shù)中驅(qū)動(dòng)這個(gè)并發(fā)的素?cái)?shù)篩了:
func main() {
ch := GenerateNatural() // 自然數(shù)序列: 2, 3, 4, ...
for i := 0; i < 100; i++ {
prime := <-ch // 新出現(xiàn)的素?cái)?shù)
fmt.Printf("%v: %v\n", i+1, prime)
ch = PrimeFilter(ch, prime) // 基于新素?cái)?shù)構(gòu)造的過(guò)濾器
}
}
我們先是調(diào)用 GenerateNatural()
生成最原始的從 2 開(kāi)始的自然數(shù)序列。然后開(kāi)始一個(gè) 100 次迭代的循環(huán),希望生成 100 個(gè)素?cái)?shù)。在每次循環(huán)迭代開(kāi)始的時(shí)候,管道中的第一個(gè)數(shù)必定是素?cái)?shù),我們先讀取并打印這個(gè)素?cái)?shù)。然后基于管道中剩余的數(shù)列,并以當(dāng)前取出的素?cái)?shù)為篩子過(guò)濾后面的素?cái)?shù)。不同的素?cái)?shù)篩子對(duì)應(yīng)的管道是串聯(lián)在一起的。
素?cái)?shù)篩展示了一種優(yōu)雅的并發(fā)程序結(jié)構(gòu)。但是因?yàn)槊總€(gè)并發(fā)體處理的任務(wù)粒度太細(xì)微,程序整體的性能并不理想。對(duì)于細(xì)粒度的并發(fā)程序,CSP 模型中固有的消息傳遞的代價(jià)太高了(多線程并發(fā)模型同樣要面臨線程啟動(dòng)的代價(jià))。
有時(shí)候我們需要通知 Goroutine 停止它正在干的事情,特別是當(dāng)它工作在錯(cuò)誤的方向上的時(shí)候。Go 語(yǔ)言并沒(méi)有提供在一個(gè)直接終止 Goroutine 的方法,由于這樣會(huì)導(dǎo)致 Goroutine 之間的共享變量處在未定義的狀態(tài)上。但是如果我們想要退出兩個(gè)或者任意多個(gè) Goroutine 怎么辦呢?
Go 語(yǔ)言中不同 Goroutine 之間主要依靠管道進(jìn)行通信和同步。要同時(shí)處理多個(gè)管道的發(fā)送或接收操作,我們需要使用 select
關(guān)鍵字(這個(gè)關(guān)鍵字和網(wǎng)絡(luò)編程中的 select
函數(shù)的行為類似)。當(dāng) select
有多個(gè)分支時(shí),會(huì)隨機(jī)選擇一個(gè)可用的管道分支,如果沒(méi)有可用的管道分支則選擇 default
分支,否則會(huì)一直保存阻塞狀態(tài)。
基于 select
實(shí)現(xiàn)的管道的超時(shí)判斷:
select {
case v := <-in:
fmt.Println(v)
case <-time.After(time.Second):
return // 超時(shí)
}
通過(guò) select
的 default
分支實(shí)現(xiàn)非阻塞的管道發(fā)送或接收操作:
select {
case v := <-in:
fmt.Println(v)
default:
// 沒(méi)有數(shù)據(jù)
}
通過(guò) select
來(lái)阻止 main
函數(shù)退出:
func main() {
// do some thins
select{}
}
當(dāng)有多個(gè)管道均可操作時(shí),select
會(huì)隨機(jī)選擇一個(gè)管道?;谠撎匦晕覀兛梢杂?nbsp;select
實(shí)現(xiàn)一個(gè)生成隨機(jī)數(shù)序列的程序:
func main() {
ch := make(chan int)
go func() {
for {
select {
case ch <- 0:
case ch <- 1:
}
}
}()
for v := range ch {
fmt.Println(v)
}
}
我們通過(guò) select
和 default
分支可以很容易實(shí)現(xiàn)一個(gè) Goroutine 的退出控制:
func worker(cancel chan bool) {
for {
select {
default:
fmt.Println("hello")
// 正常工作
case <-cancel:
// 退出
}
}
}
func main() {
cancel := make(chan bool)
go worker(cancel)
time.Sleep(time.Second)
cancel <- true
}
但是管道的發(fā)送操作和接收操作是一一對(duì)應(yīng)的,如果要停止多個(gè) Goroutine 那么可能需要?jiǎng)?chuàng)建同樣數(shù)量的管道,這個(gè)代價(jià)太大了。其實(shí)我們可以通過(guò) close
關(guān)閉一個(gè)管道來(lái)實(shí)現(xiàn)廣播的效果,所有從關(guān)閉管道接收的操作均會(huì)收到一個(gè)零值和一個(gè)可選的失敗標(biāo)志。
func worker(cancel chan bool) {
for {
select {
default:
fmt.Println("hello")
// 正常工作
case <-cancel:
// 退出
}
}
}
func main() {
cancel := make(chan bool)
for i := 0; i < 10; i++ {
go worker(cancel)
}
time.Sleep(time.Second)
close(cancel)
}
我們通過(guò) close
來(lái)關(guān)閉 cancel
管道向多個(gè) Goroutine 廣播退出的指令。不過(guò)這個(gè)程序依然不夠穩(wěn)?。寒?dāng)每個(gè) Goroutine 收到退出指令退出時(shí)一般會(huì)進(jìn)行一定的清理工作,但是退出的清理工作并不能保證被完成,因?yàn)?nbsp;main
線程并沒(méi)有等待各個(gè)工作 Goroutine 退出工作完成的機(jī)制。我們可以結(jié)合 sync.WaitGroup
來(lái)改進(jìn):
func worker(wg *sync.WaitGroup, cancel chan bool) {
defer wg.Done()
for {
select {
default:
fmt.Println("hello")
case <-cancel:
return
}
}
}
func main() {
cancel := make(chan bool)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(&wg, cancel)
}
time.Sleep(time.Second)
close(cancel)
wg.Wait()
}
現(xiàn)在每個(gè)工作者并發(fā)體的創(chuàng)建、運(yùn)行、暫停和退出都是在 main
函數(shù)的安全控制之下了。
在 Go1.7 發(fā)布時(shí),標(biāo)準(zhǔn)庫(kù)增加了一個(gè) context
包,用來(lái)簡(jiǎn)化對(duì)于處理單個(gè)請(qǐng)求的多個(gè) Goroutine 之間與請(qǐng)求域的數(shù)據(jù)、超時(shí)和退出等操作,官方有博文對(duì)此做了專門(mén)介紹。我們可以用 context
包來(lái)重新實(shí)現(xiàn)前面的線程安全退出或超時(shí)的控制:
func worker(ctx context.Context, wg *sync.WaitGroup) error {
defer wg.Done()
for {
select {
default:
fmt.Println("hello")
case <-ctx.Done():
return ctx.Err()
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(ctx, &wg)
}
time.Sleep(time.Second)
cancel()
wg.Wait()
}
當(dāng)并發(fā)體超時(shí)或 main
主動(dòng)停止工作者 Goroutine 時(shí),每個(gè)工作者都可以安全退出。
Go 語(yǔ)言是帶內(nèi)存自動(dòng)回收特性的,因此內(nèi)存一般不會(huì)泄漏。在前面素?cái)?shù)篩的例子中,GenerateNatural
和 PrimeFilter
函數(shù)內(nèi)部都啟動(dòng)了新的 Goroutine,當(dāng) main
函數(shù)不再使用管道時(shí)后臺(tái) Goroutine 有泄漏的風(fēng)險(xiǎn)。我們可以通過(guò) context
包來(lái)避免這個(gè)問(wèn)題,下面是改進(jìn)的素?cái)?shù)篩實(shí)現(xiàn):
// 返回生成自然數(shù)序列的管道: 2, 3, 4, ...
func GenerateNatural(ctx context.Context) chan int {
ch := make(chan int)
go func() {
for i := 2; ; i++ {
select {
case <- ctx.Done():
return
case ch <- i:
}
}
}()
return ch
}
// 管道過(guò)濾器: 刪除能被素?cái)?shù)整除的數(shù)
func PrimeFilter(ctx context.Context, in <-chan int, prime int) chan int {
out := make(chan int)
go func() {
for {
if i := <-in; i%prime != 0 {
select {
case <- ctx.Done():
return
case out <- i:
}
}
}
}()
return out
}
func main() {
// 通過(guò) Context 控制后臺(tái) Goroutine 狀態(tài)
ctx, cancel := context.WithCancel(context.Background())
ch := GenerateNatural(ctx) // 自然數(shù)序列: 2, 3, 4, ...
for i := 0; i < 100; i++ {
prime := <-ch // 新出現(xiàn)的素?cái)?shù)
fmt.Printf("%v: %v\n", i+1, prime)
ch = PrimeFilter(ctx, ch, prime) // 基于新素?cái)?shù)構(gòu)造的過(guò)濾器
}
cancel()
}
當(dāng) main 函數(shù)完成工作前,通過(guò)調(diào)用 cancel()
來(lái)通知后臺(tái) Goroutine 退出,這樣就避免了 Goroutine 的泄漏。
然而,上面這個(gè)例子只是展示了 cancel()
的基礎(chǔ)用法,實(shí)際上這個(gè)例子會(huì)導(dǎo)致 Goroutine 死鎖,不能正常退出。 我們可以給上面這個(gè)例子添加 sync.WaitGroup
來(lái)復(fù)現(xiàn)這個(gè)問(wèn)題。
package main
import (
"context"
"fmt"
"sync"
)
// 返回生成自然數(shù)序列的管道: 2, 3, 4, ...
func GenerateNatural(ctx context.Context, wg *sync.WaitGroup) chan int {
ch := make(chan int)
go func() {
defer wg.Done()
for i := 2; ; i++ {
select {
case <-ctx.Done():
return
case ch <- i:
}
}
}()
return ch
}
// 管道過(guò)濾器: 刪除能被素?cái)?shù)整除的數(shù)
func PrimeFilter(ctx context.Context, in <-chan int, prime int, wg *sync.WaitGroup) chan int {
out := make(chan int)
go func() {
defer wg.Done()
for {
if i := <-in; i%prime != 0 {
select {
case <-ctx.Done():
return
case out <- i:
}
}
}
}()
return out
}
func main() {
wg := sync.WaitGroup{}
// 通過(guò) Context 控制后臺(tái) Goroutine 狀態(tài)
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
ch := GenerateNatural(ctx, &wg) // 自然數(shù)序列: 2, 3, 4, ...
for i := 0; i < 100; i++ {
prime := <-ch // 新出現(xiàn)的素?cái)?shù)
fmt.Printf("%v: %v\n", i+1, prime)
wg.Add(1)
ch = PrimeFilter(ctx, ch, prime, &wg) // 基于新素?cái)?shù)構(gòu)造的過(guò)濾器
}
cancel()
wg.Wait()
}
執(zhí)行上面這個(gè)例子很容易就復(fù)現(xiàn)了死鎖的問(wèn)題,原因是素?cái)?shù)篩中的 ctx.Done()
位于 if i := <-in; i%prime != 0
判斷之內(nèi), 而這個(gè)判斷可能會(huì)一直阻塞,導(dǎo)致 Goroutine 無(wú)法正常退出。讓我們來(lái)解決這個(gè)問(wèn)題。
package main
import (
"context"
"fmt"
"sync"
)
// 返回生成自然數(shù)序列的管道: 2, 3, 4, ...
func GenerateNatural(ctx context.Context, wg *sync.WaitGroup) chan int {
ch := make(chan int)
go func() {
defer wg.Done()
for i := 2; ; i++ {
select {
case <-ctx.Done():
return
case ch <- i:
}
}
}()
return ch
}
// 管道過(guò)濾器: 刪除能被素?cái)?shù)整除的數(shù)
func PrimeFilter(ctx context.Context, in <-chan int, prime int, wg *sync.WaitGroup) chan int {
out := make(chan int)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case i := <-in:
if i%prime != 0 {
select {
case <-ctx.Done():
return
case out <- i:
}
}
}
}
}()
return out
}
func main() {
wg := sync.WaitGroup{}
// 通過(guò) Context 控制后臺(tái) Goroutine 狀態(tài)
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
ch := GenerateNatural(ctx, &wg) // 自然數(shù)序列: 2, 3, 4, ...
for i := 0; i < 100; i++ {
prime := <-ch // 新出現(xiàn)的素?cái)?shù)
fmt.Printf("%v: %v\n", i+1, prime)
wg.Add(1)
ch = PrimeFilter(ctx, ch, prime, &wg) // 基于新素?cái)?shù)構(gòu)造的過(guò)濾器
}
cancel()
wg.Wait()
}
如上所示,我們可以通過(guò)將 i := <-in
放入 select,在這個(gè) select 內(nèi)也執(zhí)行 <-ctx.Done()
來(lái)解決阻塞導(dǎo)致的死鎖。 不過(guò)上面這個(gè)例子并不優(yōu)美,讓我們換一種方式。
package main
import (
"context"
"fmt"
"sync"
)
// 返回生成自然數(shù)序列的管道: 2, 3, 4, ...
func GenerateNatural(ctx context.Context, wg *sync.WaitGroup) chan int {
ch := make(chan int)
go func() {
defer wg.Done()
defer close(ch)
for i := 2; ; i++ {
select {
case <-ctx.Done():
return
case ch <- i:
}
}
}()
return ch
}
// 管道過(guò)濾器: 刪除能被素?cái)?shù)整除的數(shù)
func PrimeFilter(ctx context.Context, in <-chan int, prime int, wg *sync.WaitGroup) chan int {
out := make(chan int)
go func() {
defer wg.Done()
defer close(out)
for i := range in {
if i%prime != 0 {
select {
case <-ctx.Done():
return
case out <- i:
}
}
}
}()
return out
}
func main() {
wg := sync.WaitGroup{}
// 通過(guò) Context 控制后臺(tái) Goroutine 狀態(tài)
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
ch := GenerateNatural(ctx, &wg) // 自然數(shù)序列: 2, 3, 4, ...
for i := 0; i < 100; i++ {
prime := <-ch // 新出現(xiàn)的素?cái)?shù)
fmt.Printf("%v: %v\n", i+1, prime)
wg.Add(1)
ch = PrimeFilter(ctx, ch, prime, &wg) // 基于新素?cái)?shù)構(gòu)造的過(guò)濾器
}
cancel()
wg.Wait()
}
在上面這個(gè)例子中主要有以下幾點(diǎn)需要關(guān)注:
for range
? 循環(huán)保證了輸入管道被關(guān)閉時(shí),循環(huán)能退出,不會(huì)出現(xiàn)死循環(huán);defer close
? 保證了無(wú)論是輸入管道被關(guān)閉,還是 ctx 被取消,只要素?cái)?shù)篩退出,都會(huì)關(guān)閉輸出管道。至此,我們終于足夠優(yōu)美地解決了這個(gè)死鎖問(wèn)題。
并發(fā)是一個(gè)非常大的主題,我們這里只是展示幾個(gè)非?;A(chǔ)的并發(fā)編程的例子。官方文檔也有很多關(guān)于并發(fā)編程的討論,國(guó)內(nèi)也有專門(mén)討論 Go 語(yǔ)言并發(fā)編程的書(shū)籍。讀者可以根據(jù)自己的需求查閱相關(guān)的文獻(xiàn)。
![]() | ![]() |
更多建議: