Go語言是一門天然支持并發(fā)的編程語言。 通過使用?go
?關(guān)鍵字,我們可以很輕松地創(chuàng)建協(xié)程;通過使用通道和Go中提供的其它各種同步技術(shù),并發(fā)編程變得簡(jiǎn)單、輕松和有趣。
另一方面,Go并不阻止程序員在并發(fā)編程中因?yàn)榇中幕蛘呓?jīng)驗(yàn)不足而犯錯(cuò)。 本文的余下部分將展示一些常見的并發(fā)錯(cuò)誤,來幫助Go程序員在實(shí)踐中避免這些錯(cuò)誤。
我們已經(jīng)知道,源文件中的代碼行在運(yùn)行時(shí)刻并非總是按照它們的出現(xiàn)次序被執(zhí)行。
下面這個(gè)示例程序犯了兩個(gè)錯(cuò)誤:
b
的讀取和匿名協(xié)程中的對(duì)變量b
的寫入可能會(huì)產(chǎn)生數(shù)據(jù)競(jìng)爭(zhēng);b == true
成立并不能確保條件a != nil
也成立。 編譯器和CPU可能會(huì)對(duì)調(diào)整此程序中匿名協(xié)程中的某些指令的順序已獲取更快的執(zhí)行速度。 所以,站在主協(xié)程的視角看,對(duì)變量b
的賦值可能會(huì)發(fā)生在對(duì)變量a
的賦值之前,這將造成在修改a
的元素時(shí)a
依然為一個(gè)nil切片。package main
import (
"time"
"runtime"
)
func main() {
var a []int // nil
var b bool // false
// 一個(gè)匿名協(xié)程。
go func () {
a = make([]int, 3)
b = true // 寫入b
}()
for !b { // 讀取b
time.Sleep(time.Second)
runtime.Gosched()
}
a[0], a[1], a[2] = 0, 1, 2 // 可能會(huì)發(fā)生恐慌
}
上面這個(gè)程序可能在很多計(jì)算機(jī)上運(yùn)行良好,但是可能會(huì)在某些計(jì)算機(jī)上因?yàn)榭只哦罎⑼顺?;或者使用某些編譯器編譯的時(shí)候運(yùn)行良好,但使用另外的某個(gè)編譯器編譯的時(shí)候?qū)⒃斐沙绦蜻\(yùn)行時(shí)崩潰退出。
我們應(yīng)該使用通道或者sync
標(biāo)準(zhǔn)庫包中的同步技術(shù)來確保內(nèi)存順序。比如:
package main
func main() {
var a []int = nil
c := make(chan struct{})
go func () {
a = make([]int, 3)
c <- struct{}{}
}()
<-c
a[0], a[1], a[2] = 0, 1, 2 // 絕不會(huì)造成恐慌
}
讓我們看一個(gè)簡(jiǎn)單的例子:
package main
import (
"fmt"
"time"
)
func main() {
var x = 123
go func() {
x = 789 // 寫入x
}()
time.Sleep(time.Second)
fmt.Println(x) // 讀取x
}
我們期望著此程序打印出789
。 事實(shí)上,則其運(yùn)行結(jié)果常常正如我們所期待的。 但是,此程序中的同步處理實(shí)現(xiàn)的正確嗎?否!原因很簡(jiǎn)單,Go運(yùn)行時(shí)并不能保證對(duì)x
的寫入一定發(fā)生在對(duì)x
的讀取之前。 在某些特定的情形下,比如CPU資源被很一些其它計(jì)算密集的程序所占用,則對(duì)x
的寫入有可能發(fā)生在對(duì)x
的讀取之后。 因此,我們不應(yīng)該在正式的項(xiàng)目中使用time.Sleep
調(diào)用來做同步。
讓我們看另一個(gè)簡(jiǎn)單的例子:
package main
import (
"fmt"
"time"
)
var x = 0
func main() {
var num = 123
var p = &num
c := make(chan int)
go func() {
c <- *p + x
}()
time.Sleep(time.Second)
num = 789
fmt.Println(<-c)
}
你覺得此程序會(huì)輸出什么?123
還是789
? 事實(shí)上,它的輸出是和具體使用的編譯器相關(guān)的。 對(duì)于標(biāo)準(zhǔn)編譯器1.19版本來說,它很可能輸出123
。 但是從理論上說,它也可能輸出789
。
讓我們將此例中的c <- *p + x
一行換成c <- *p
,然后重新運(yùn)行它,你將會(huì)發(fā)現(xiàn)它的輸出變成了789
(如果它使用標(biāo)準(zhǔn)編譯器1.19版本編譯的話)。 重申一次,此結(jié)果是和具體使用的編譯器和編譯器的版本相關(guān)的。
是的,此程序中存在數(shù)據(jù)競(jìng)爭(zhēng)。表達(dá)式*p
的估值可能發(fā)生在賦值num = 789
之前、之后、或者同時(shí)。 time.Sleep
調(diào)用并不能保證*p
的估值發(fā)生在此賦值之后。
對(duì)于這個(gè)特定的例子,我們應(yīng)該將欲發(fā)送的值在開啟新協(xié)程之前存儲(chǔ)在一個(gè)臨時(shí)變量中來避免數(shù)據(jù)競(jìng)爭(zhēng)。
...
tmp := *p
go func() {
c <- tmp
}()
...
有很多原因?qū)е履硞€(gè)協(xié)程永久阻塞,比如:
除了有時(shí)我們故意地將主協(xié)程永久阻塞以防止程序退出外,其它大多數(shù)造成協(xié)程永久阻塞的情況都不是我們所期待的。 Go運(yùn)行時(shí)很難分辨出一個(gè)處于阻塞狀態(tài)的協(xié)程是否將永久阻塞下去,所以Go運(yùn)行時(shí)不會(huì)釋放永久處于阻塞狀態(tài)的協(xié)程占用的資源。
在采用最快回應(yīng)通道用例中,如果被當(dāng)作future/promise來用的通道的容量不足夠大,則較慢回應(yīng)的協(xié)程在準(zhǔn)備發(fā)送回應(yīng)結(jié)果時(shí)將永久阻塞。 比如,下面的例子中,每個(gè)請(qǐng)求將導(dǎo)致4個(gè)協(xié)程永久阻塞。
func request() int {
c := make(chan int)
for i := 0; i < 5; i++ {
i := i
go func() {
c <- i // 4個(gè)協(xié)程將永久阻塞在這里
}()
}
return <-c
}
為了防止有4個(gè)協(xié)程永久阻塞,被當(dāng)作future/promise使用的通道的容量必須至少為4
.
在第二種“采用最快回應(yīng)”實(shí)現(xiàn)方法中,如果被當(dāng)作future/promise使用的通道是一個(gè)非緩沖通道(如下面的代碼所示),則有可能導(dǎo)致其通道的接收者可能會(huì)錯(cuò)過所有的回應(yīng)而導(dǎo)致處于永久阻塞狀態(tài)。
func request() int {
c := make(chan int)
for i := 0; i < 5; i++ {
i := i
go func() {
select {
case c <- i:
default:
}
}()
}
return <-c // 有可能永久阻塞在此
}
接收者協(xié)程可能會(huì)永久阻塞的原因是如果5個(gè)嘗試發(fā)送操作都發(fā)生在接收操作<-c
準(zhǔn)備好之前,亦即5個(gè)個(gè)嘗試發(fā)送操作都失敗了,則接收者協(xié)程將永遠(yuǎn)無值可接收(從而將處于永久阻塞狀態(tài))。
將通道c
改為一個(gè)緩沖通道,則至少會(huì)有一個(gè)嘗試發(fā)送將成功,從而接收者協(xié)程肯定不會(huì)永久阻塞。
在實(shí)踐中,sync
標(biāo)準(zhǔn)庫包中的類型(除了Locker
接口類型)的值不應(yīng)該被復(fù)制。 我們只應(yīng)該復(fù)制它們的指針值。
下面是一個(gè)有問題的并發(fā)編程的例子。 在此例子中,當(dāng)Counter.Value
方法被調(diào)用時(shí),一個(gè)Counter
屬主值將被復(fù)制,此屬主值的字段Mutex
也將被一同復(fù)制。 此復(fù)制并沒有被同步保護(hù),因此復(fù)制結(jié)果可能是不完整的,并非被復(fù)制的屬主值的一個(gè)快照。 即使此Mutex
字段得以僥幸完整復(fù)制,它的副本所保護(hù)的是對(duì)字段n
的一個(gè)副本的訪問,因此一般是沒有意義的。
import "sync"
type Counter struct {
sync.Mutex
n int64
}
// 此方法實(shí)現(xiàn)是沒問題的。
func (c *Counter) Increase(d int64) (r int64) {
c.Lock()
c.n += d
r = c.n
c.Unlock()
return
}
// 此方法的實(shí)現(xiàn)是有問題的。當(dāng)它被調(diào)用時(shí),
// 一個(gè)Counter屬主值將被復(fù)制。
func (c Counter) Value() (r int64) {
c.Lock()
r = c.n
c.Unlock()
return
}
我們應(yīng)該將Value
方法的屬主參數(shù)類型更改為指針類型*Counter
來避免復(fù)制sync.Mutex
值。
Go官方工具鏈中提供的go vet
命令將提示此例中的Value
方法的聲明可能是一個(gè)潛在的邏輯錯(cuò)誤。
每個(gè)sync.WaitGroup
值內(nèi)部維護(hù)著一個(gè)計(jì)數(shù)。此計(jì)數(shù)的初始值為0。 如果一個(gè)sync.WaitGroup
值的Wait
方法在此計(jì)數(shù)為0的時(shí)候被調(diào)用,則此調(diào)用不會(huì)阻塞,否則此調(diào)用將一直阻塞到此計(jì)數(shù)變?yōu)?為止。
為了讓一個(gè)WaitGroup
值的使用有意義,在此值的計(jì)數(shù)為0的情況下,對(duì)它的下一次Add
方法的調(diào)用必須出現(xiàn)在對(duì)它的下一次Wait
方法的調(diào)用之前。
比如,在下面的例子中,Add
方法的調(diào)用位置是不合適的。 此例子程序的打印結(jié)果并不總是100
,而可能是0
到100
間的任何一個(gè)值。 原因是沒有任何一個(gè)Add
方法調(diào)用可以確保發(fā)生在唯一的Wait
方法調(diào)用之前,結(jié)果導(dǎo)致沒有任何一個(gè)Done
方法調(diào)用可以確保發(fā)生在唯一的Wait
方法調(diào)用返回之前。
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var wg sync.WaitGroup
var x int32 = 0
for i := 0; i < 100; i++ {
go func() {
wg.Add(1)
atomic.AddInt32(&x, 1)
wg.Done()
}()
}
fmt.Println("等待片刻...")
wg.Wait()
fmt.Println(atomic.LoadInt32(&x))
}
我們應(yīng)該將對(duì)Add
方法的調(diào)用移出匿名協(xié)程之外,像下面這樣,使得任何一個(gè)Done
方法調(diào)用都確保發(fā)生在唯一的Wait
方法調(diào)用返回之前。
...
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
atomic.AddInt32(&x, 1)
wg.Done()
}()
}
...
從通道用例大全一文中,我們了解到一些函數(shù)可以返回用做future/promise的通道結(jié)果。 假設(shè)fa
和fb
是這樣的兩個(gè)函數(shù),則下面的調(diào)用方式并沒有體現(xiàn)出這兩個(gè)函數(shù)的真正價(jià)值。
doSomethingWithFutureArguments(<-fa(), <-fb())
在上面這行調(diào)用中,兩個(gè)實(shí)參值(promise回應(yīng)結(jié)果)的生成實(shí)際上是串行進(jìn)行的,future/promise的價(jià)值沒有體現(xiàn)出來。
我們應(yīng)該像下面這樣調(diào)用這兩個(gè)函數(shù)來并發(fā)生成兩個(gè)回應(yīng)結(jié)果:
ca, cb := fa(), fb()
doSomethingWithFutureArguments(<-ca, <-cb)
Go程序員常犯的一個(gè)錯(cuò)誤是關(guān)閉一個(gè)后續(xù)可能還會(huì)有協(xié)程向其發(fā)送數(shù)據(jù)的通道。 當(dāng)向一個(gè)已關(guān)閉的通道發(fā)送數(shù)據(jù)的時(shí)候,一個(gè)恐慌將產(chǎn)生。
這樣的錯(cuò)誤曾經(jīng)發(fā)生在一些很有名的項(xiàng)目中,比如Kubernetes項(xiàng)目中的這個(gè)bug和這個(gè)bug。
請(qǐng)閱讀此篇文章來了解如何安全和優(yōu)雅地關(guān)閉通道。
64位非方法原子操作中涉及到的實(shí)參地址必須為8字節(jié)對(duì)齊的。不滿足此條件的64位原子操作將造成一個(gè)恐慌。 對(duì)于標(biāo)準(zhǔn)編譯器,這樣的情形只可能發(fā)生在32位的架構(gòu)中。 從Go 1.19版本開始,我們可以使用64位方法原子操作來避免這一缺陷。 請(qǐng)閱讀內(nèi)存布局一文來獲知如何確保讓64位的整數(shù)值的地址在32位的架構(gòu)中8字節(jié)對(duì)齊。
time
標(biāo)準(zhǔn)庫包中的After
函數(shù)返回一個(gè)用做延遲通知的通道。 此函數(shù)給并發(fā)編程帶來了很多便利,但是它的每個(gè)調(diào)用都需要?jiǎng)?chuàng)建一個(gè)time.Timer
值,此新創(chuàng)建的Timer
值在傳遞給After
函數(shù)調(diào)用的時(shí)長(zhǎng)(實(shí)參)內(nèi)肯定不會(huì)被垃圾回收。
如果此函數(shù)在某個(gè)時(shí)段內(nèi)被多次頻繁調(diào)用,則可能導(dǎo)致積累很多尚未過期的Timer
值從而造成大量的內(nèi)存和計(jì)算消耗。
比如在下面這個(gè)例子中,如果longRunning
函數(shù)被調(diào)用并且在一分鐘內(nèi)有一百萬條消息到達(dá), 那么在某個(gè)特定的很小時(shí)間段(大概若干秒)內(nèi)將存在一百萬個(gè)活躍的Timer
值,即使其中只有一個(gè)是真正有用的。
import (
"fmt"
"time"
)
// 如果某兩個(gè)連續(xù)的消息的間隔大于一分鐘,此函數(shù)將返回。
func longRunning(messages <-chan string) {
for {
select {
case <-time.After(time.Minute):
return
case msg := <-messages:
fmt.Println(msg)
}
}
}
為了避免太多的Timer
值被創(chuàng)建,我們應(yīng)該只使用(并復(fù)用)一個(gè)Timer
值,像下面這樣:
func longRunning(messages <-chan string) {
timer := time.NewTimer(time.Minute)
defer timer.Stop()
for {
select {
case <-timer.C: // 過期了
return
case msg := <-messages:
fmt.Println(msg)
// 此if代碼塊很重要。
if !timer.Stop() {
<-timer.C
}
}
// 必須重置以復(fù)用。
timer.Reset(time.Minute)
}
}
注意,此示例中的if
代碼塊用來舍棄一個(gè)可能在執(zhí)行第二個(gè)分支代碼塊的時(shí)候發(fā)送過來的超時(shí)通知。
一個(gè)典型的time.Timer
的使用已經(jīng)在上一節(jié)中展示了。一些解釋:
Timer
值已經(jīng)過期或者已經(jīng)被終止(stopped),則相應(yīng)的Stop
方法調(diào)用返回false
。
在此Timer
值尚未終止的時(shí)候,Stop
方法調(diào)用返回false
只能意味著此Timer
值已經(jīng)過期。
Timer
值被終止之后,它的通道字段C
最多只能含有一個(gè)過期的通知。
Timer
終止(stopped)之后并且在重置和重用此Timer
值之前,我們應(yīng)該確保此Timer
值中肯定不存在過期的通知。
這就是上一節(jié)中的例子中的if
代碼塊的意義所在。
一個(gè)*Timer
值的Reset
方法必須在對(duì)應(yīng)Timer
值過期或者終止之后才能被調(diào)用; 否則,此Reset
方法調(diào)用和一個(gè)可能的向此Timer
值的C
通道字段的發(fā)送通知操作產(chǎn)生數(shù)據(jù)競(jìng)爭(zhēng)。
如果上一節(jié)中的例子中的select
流程控制代碼塊中的第一個(gè)分支被選中,則這表示相應(yīng)的Timer
值已經(jīng)過期,所以我們不必終止它。 但是我們必須在第二個(gè)分支中通過終止此Timer
以檢查此Timer
中是否存在一個(gè)過期的通知。 如果確實(shí)有一個(gè)過期的通知,我們必須在重用這個(gè)Timer
之前將此過期的通知取出;否則,此過期的通知將下一個(gè)循環(huán)步導(dǎo)致在第一個(gè)分支立即被選中。
比如,下面這個(gè)程序?qū)⒃谶\(yùn)行后大概一秒鐘(而不是十秒鐘)后退出。 而且此程序存在著潛在的數(shù)據(jù)競(jìng)爭(zhēng)。
package main
import (
"fmt"
"time"
)
func main() {
start := time.Now()
timer := time.NewTimer(time.Second/2)
select {
case <-timer.C:
default:
time.Sleep(time.Second) // 此分支被選中的可能性較大
}
timer.Reset(time.Second * 10) // 可能數(shù)據(jù)競(jìng)爭(zhēng)
<-timer.C
fmt.Println(time.Since(start)) // 大約1s
}
當(dāng)一個(gè)time.Timer
值不再被使用后,我們不必(但是推薦)終止之。
在多個(gè)協(xié)程中使用同一個(gè)time.Timer
值比較容易寫出不當(dāng)?shù)牟l(fā)代碼,所以盡量不要跨協(xié)程使用一個(gè)Timer
值。
我們不應(yīng)該依賴于time.Timer
的Reset
方法的返回值。此返回值只要是為了歷史兼容性而存在的。
更多建議: