Go語言 一些常見并發(fā)編程錯(cuò)誤

2023-02-16 17:40 更新

Go語言是一門天然支持并發(fā)的編程語言。 通過使用?go?關(guān)鍵字,我們可以很輕松地創(chuàng)建協(xié)程;通過使用通道Go中提供的其它各種同步技術(shù),并發(fā)編程變得簡(jiǎn)單、輕松和有趣。

另一方面,Go并不阻止程序員在并發(fā)編程中因?yàn)榇中幕蛘呓?jīng)驗(yàn)不足而犯錯(cuò)。 本文的余下部分將展示一些常見的并發(fā)錯(cuò)誤,來幫助Go程序員在實(shí)踐中避免這些錯(cuò)誤。

當(dāng)需要同步的時(shí)候沒有同步

我們已經(jīng)知道,源文件中的代碼行在運(yùn)行時(shí)刻并非總是按照它們的出現(xiàn)次序被執(zhí)行。

下面這個(gè)示例程序犯了兩個(gè)錯(cuò)誤:

  • 首先,主協(xié)程中對(duì)變量b的讀取和匿名協(xié)程中的對(duì)變量b的寫入可能會(huì)產(chǎn)生數(shù)據(jù)競(jìng)爭(zhēng);
  • 其次,在主協(xié)程中,條件b == true成立并不能確保條件a != nil也成立。 編譯器和CPU可能會(huì)對(duì)調(diào)整此程序中匿名協(xié)程中的某些指令的順序已獲取更快的執(zhí)行速度。 所以,站在主協(xié)程的視角看,對(duì)變量b的賦值可能會(huì)發(fā)生在對(duì)變量a的賦值之前,這將造成在修改a的元素時(shí)a依然為一個(gè)nil切片。
package main

import (
	"time"
	"runtime"
)

func main() {
	var a []int // nil
	var b bool  // false

	// 一個(gè)匿名協(xié)程。
	go func () {
		a = make([]int, 3)
		b = true // 寫入b
	}()

	for !b { // 讀取b
		time.Sleep(time.Second)
		runtime.Gosched()
	}
	a[0], a[1], a[2] = 0, 1, 2 // 可能會(huì)發(fā)生恐慌
}

上面這個(gè)程序可能在很多計(jì)算機(jī)上運(yùn)行良好,但是可能會(huì)在某些計(jì)算機(jī)上因?yàn)榭只哦罎⑼顺?;或者使用某些編譯器編譯的時(shí)候運(yùn)行良好,但使用另外的某個(gè)編譯器編譯的時(shí)候?qū)⒃斐沙绦蜻\(yùn)行時(shí)崩潰退出。

我們應(yīng)該使用通道或者sync標(biāo)準(zhǔn)庫包中的同步技術(shù)來確保內(nèi)存順序。比如:

package main

func main() {
	var a []int = nil
	c := make(chan struct{})

	go func () {
		a = make([]int, 3)
		c <- struct{}{}
	}()

	<-c
	a[0], a[1], a[2] = 0, 1, 2 // 絕不會(huì)造成恐慌
}

使用time.Sleep調(diào)用來做同步

讓我們看一個(gè)簡(jiǎn)單的例子:

package main

import (
	"fmt"
	"time"
)

func main() {
	var x = 123

	go func() {
		x = 789 // 寫入x
	}()

	time.Sleep(time.Second)
	fmt.Println(x) // 讀取x
}

我們期望著此程序打印出789。 事實(shí)上,則其運(yùn)行結(jié)果常常正如我們所期待的。 但是,此程序中的同步處理實(shí)現(xiàn)的正確嗎?否!原因很簡(jiǎn)單,Go運(yùn)行時(shí)并不能保證對(duì)x的寫入一定發(fā)生在對(duì)x的讀取之前。 在某些特定的情形下,比如CPU資源被很一些其它計(jì)算密集的程序所占用,則對(duì)x的寫入有可能發(fā)生在對(duì)x的讀取之后。 因此,我們不應(yīng)該在正式的項(xiàng)目中使用time.Sleep調(diào)用來做同步。

讓我們看另一個(gè)簡(jiǎn)單的例子:

package main

import (
	"fmt"
	"time"
)

var x = 0

func main() {
	var num = 123
	var p = &num

	c := make(chan int)

	go func() {
		c <- *p + x
	}()

	time.Sleep(time.Second)
	num = 789
	fmt.Println(<-c)
}

你覺得此程序會(huì)輸出什么?123還是789? 事實(shí)上,它的輸出是和具體使用的編譯器相關(guān)的。 對(duì)于標(biāo)準(zhǔn)編譯器1.19版本來說,它很可能輸出123。 但是從理論上說,它也可能輸出789

讓我們將此例中的c <- *p + x一行換成c <- *p,然后重新運(yùn)行它,你將會(huì)發(fā)現(xiàn)它的輸出變成了789(如果它使用標(biāo)準(zhǔn)編譯器1.19版本編譯的話)。 重申一次,此結(jié)果是和具體使用的編譯器和編譯器的版本相關(guān)的。

是的,此程序中存在數(shù)據(jù)競(jìng)爭(zhēng)。表達(dá)式*p的估值可能發(fā)生在賦值num = 789之前、之后、或者同時(shí)。 time.Sleep調(diào)用并不能保證*p的估值發(fā)生在此賦值之后。

對(duì)于這個(gè)特定的例子,我們應(yīng)該將欲發(fā)送的值在開啟新協(xié)程之前存儲(chǔ)在一個(gè)臨時(shí)變量中來避免數(shù)據(jù)競(jìng)爭(zhēng)。

...
	tmp := *p
	go func() {
		c <- tmp
	}()
...

使一些協(xié)程永久處于阻塞狀態(tài)

有很多原因?qū)е履硞€(gè)協(xié)程永久阻塞,比如:

  • 從一個(gè)永遠(yuǎn)不會(huì)有其它協(xié)程向其發(fā)送數(shù)據(jù)的通道接收數(shù)據(jù);
  • 向一個(gè)永遠(yuǎn)不會(huì)有其它協(xié)程從中讀取數(shù)據(jù)的通道發(fā)送數(shù)據(jù);
  • 被自己死鎖了;
  • 和其它協(xié)程相互死鎖了;
  • 等等。

除了有時(shí)我們故意地將主協(xié)程永久阻塞以防止程序退出外,其它大多數(shù)造成協(xié)程永久阻塞的情況都不是我們所期待的。 Go運(yùn)行時(shí)很難分辨出一個(gè)處于阻塞狀態(tài)的協(xié)程是否將永久阻塞下去,所以Go運(yùn)行時(shí)不會(huì)釋放永久處于阻塞狀態(tài)的協(xié)程占用的資源。

采用最快回應(yīng)通道用例中,如果被當(dāng)作future/promise來用的通道的容量不足夠大,則較慢回應(yīng)的協(xié)程在準(zhǔn)備發(fā)送回應(yīng)結(jié)果時(shí)將永久阻塞。 比如,下面的例子中,每個(gè)請(qǐng)求將導(dǎo)致4個(gè)協(xié)程永久阻塞。

func request() int {
	c := make(chan int)
	for i := 0; i < 5; i++ {
		i := i
		go func() {
			c <- i // 4個(gè)協(xié)程將永久阻塞在這里
		}()
	}
	return <-c
}

為了防止有4個(gè)協(xié)程永久阻塞,被當(dāng)作future/promise使用的通道的容量必須至少為4.

第二種“采用最快回應(yīng)”實(shí)現(xiàn)方法中,如果被當(dāng)作future/promise使用的通道是一個(gè)非緩沖通道(如下面的代碼所示),則有可能導(dǎo)致其通道的接收者可能會(huì)錯(cuò)過所有的回應(yīng)而導(dǎo)致處于永久阻塞狀態(tài)。

func request() int {
	c := make(chan int)
	for i := 0; i < 5; i++ {
		i := i
		go func() {
			select {
			case c <- i:
			default:
			}
		}()
	}
	return <-c // 有可能永久阻塞在此
}

接收者協(xié)程可能會(huì)永久阻塞的原因是如果5個(gè)嘗試發(fā)送操作都發(fā)生在接收操作<-c準(zhǔn)備好之前,亦即5個(gè)個(gè)嘗試發(fā)送操作都失敗了,則接收者協(xié)程將永遠(yuǎn)無值可接收(從而將處于永久阻塞狀態(tài))。

將通道c改為一個(gè)緩沖通道,則至少會(huì)有一個(gè)嘗試發(fā)送將成功,從而接收者協(xié)程肯定不會(huì)永久阻塞。

復(fù)制sync標(biāo)準(zhǔn)庫包中的類型的值

在實(shí)踐中,sync標(biāo)準(zhǔn)庫包中的類型(除了Locker接口類型)的值不應(yīng)該被復(fù)制。 我們只應(yīng)該復(fù)制它們的指針值。

下面是一個(gè)有問題的并發(fā)編程的例子。 在此例子中,當(dāng)Counter.Value方法被調(diào)用時(shí),一個(gè)Counter屬主值將被復(fù)制,此屬主值的字段Mutex也將被一同復(fù)制。 此復(fù)制并沒有被同步保護(hù),因此復(fù)制結(jié)果可能是不完整的,并非被復(fù)制的屬主值的一個(gè)快照。 即使此Mutex字段得以僥幸完整復(fù)制,它的副本所保護(hù)的是對(duì)字段n的一個(gè)副本的訪問,因此一般是沒有意義的。

import "sync"

type Counter struct {
	sync.Mutex
	n int64
}

// 此方法實(shí)現(xiàn)是沒問題的。
func (c *Counter) Increase(d int64) (r int64) {
	c.Lock()
	c.n += d
	r = c.n
	c.Unlock()
	return
}

// 此方法的實(shí)現(xiàn)是有問題的。當(dāng)它被調(diào)用時(shí),
// 一個(gè)Counter屬主值將被復(fù)制。
func (c Counter) Value() (r int64) {
	c.Lock()
	r = c.n
	c.Unlock()
	return
}

我們應(yīng)該將Value方法的屬主參數(shù)類型更改為指針類型*Counter來避免復(fù)制sync.Mutex值。

Go官方工具鏈中提供的go vet命令將提示此例中的Value方法的聲明可能是一個(gè)潛在的邏輯錯(cuò)誤。

在錯(cuò)誤的地方調(diào)用sync.WaitGroup.Add方法

每個(gè)sync.WaitGroup值內(nèi)部維護(hù)著一個(gè)計(jì)數(shù)。此計(jì)數(shù)的初始值為0。 如果一個(gè)sync.WaitGroup值的Wait方法在此計(jì)數(shù)為0的時(shí)候被調(diào)用,則此調(diào)用不會(huì)阻塞,否則此調(diào)用將一直阻塞到此計(jì)數(shù)變?yōu)?為止。

為了讓一個(gè)WaitGroup值的使用有意義,在此值的計(jì)數(shù)為0的情況下,對(duì)它的下一次Add方法的調(diào)用必須出現(xiàn)在對(duì)它的下一次Wait方法的調(diào)用之前。

比如,在下面的例子中,Add方法的調(diào)用位置是不合適的。 此例子程序的打印結(jié)果并不總是100,而可能是0100間的任何一個(gè)值。 原因是沒有任何一個(gè)Add方法調(diào)用可以確保發(fā)生在唯一的Wait方法調(diào)用之前,結(jié)果導(dǎo)致沒有任何一個(gè)Done方法調(diào)用可以確保發(fā)生在唯一的Wait方法調(diào)用返回之前。

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main() {
	var wg sync.WaitGroup
	var x int32 = 0
	for i := 0; i < 100; i++ {
		go func() {
			wg.Add(1)
			atomic.AddInt32(&x, 1)
			wg.Done()
		}()
	}

	fmt.Println("等待片刻...")
	wg.Wait()
	fmt.Println(atomic.LoadInt32(&x))
}

我們應(yīng)該將對(duì)Add方法的調(diào)用移出匿名協(xié)程之外,像下面這樣,使得任何一個(gè)Done方法調(diào)用都確保發(fā)生在唯一的Wait方法調(diào)用返回之前。

...
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go func() {
			atomic.AddInt32(&x, 1)
			wg.Done()
		}()
	}
...

不當(dāng)?shù)厥褂糜米鯢uture/Promise的通道

通道用例大全一文中,我們了解到一些函數(shù)可以返回用做future/promise的通道結(jié)果。 假設(shè)fafb是這樣的兩個(gè)函數(shù),則下面的調(diào)用方式并沒有體現(xiàn)出這兩個(gè)函數(shù)的真正價(jià)值。

doSomethingWithFutureArguments(<-fa(), <-fb())

在上面這行調(diào)用中,兩個(gè)實(shí)參值(promise回應(yīng)結(jié)果)的生成實(shí)際上是串行進(jìn)行的,future/promise的價(jià)值沒有體現(xiàn)出來。

我們應(yīng)該像下面這樣調(diào)用這兩個(gè)函數(shù)來并發(fā)生成兩個(gè)回應(yīng)結(jié)果:

ca, cb := fa(), fb()
doSomethingWithFutureArguments(<-ca, <-cb)

沒有讓最后一個(gè)活躍的發(fā)送者關(guān)閉通道

Go程序員常犯的一個(gè)錯(cuò)誤是關(guān)閉一個(gè)后續(xù)可能還會(huì)有協(xié)程向其發(fā)送數(shù)據(jù)的通道。 當(dāng)向一個(gè)已關(guān)閉的通道發(fā)送數(shù)據(jù)的時(shí)候,一個(gè)恐慌將產(chǎn)生。

這樣的錯(cuò)誤曾經(jīng)發(fā)生在一些很有名的項(xiàng)目中,比如Kubernetes項(xiàng)目中的這個(gè)bug這個(gè)bug。

請(qǐng)閱讀此篇文章來了解如何安全和優(yōu)雅地關(guān)閉通道。

對(duì)地址不保證為8字節(jié)對(duì)齊的值執(zhí)行64位原子操作

64位非方法原子操作中涉及到的實(shí)參地址必須為8字節(jié)對(duì)齊的。不滿足此條件的64位原子操作將造成一個(gè)恐慌。 對(duì)于標(biāo)準(zhǔn)編譯器,這樣的情形只可能發(fā)生在32位的架構(gòu)中。 從Go 1.19版本開始,我們可以使用64位方法原子操作來避免這一缺陷。 請(qǐng)閱讀內(nèi)存布局一文來獲知如何確保讓64位的整數(shù)值的地址在32位的架構(gòu)中8字節(jié)對(duì)齊。

沒留意過多的time.After函數(shù)調(diào)用消耗了大量資源

time標(biāo)準(zhǔn)庫包中的After函數(shù)返回一個(gè)用做延遲通知的通道。 此函數(shù)給并發(fā)編程帶來了很多便利,但是它的每個(gè)調(diào)用都需要?jiǎng)?chuàng)建一個(gè)time.Timer值,此新創(chuàng)建的Timer值在傳遞給After函數(shù)調(diào)用的時(shí)長(zhǎng)(實(shí)參)內(nèi)肯定不會(huì)被垃圾回收。 如果此函數(shù)在某個(gè)時(shí)段內(nèi)被多次頻繁調(diào)用,則可能導(dǎo)致積累很多尚未過期的Timer值從而造成大量的內(nèi)存和計(jì)算消耗。

比如在下面這個(gè)例子中,如果longRunning函數(shù)被調(diào)用并且在一分鐘內(nèi)有一百萬條消息到達(dá), 那么在某個(gè)特定的很小時(shí)間段(大概若干秒)內(nèi)將存在一百萬個(gè)活躍的Timer值,即使其中只有一個(gè)是真正有用的。

import (
	"fmt"
	"time"
)

// 如果某兩個(gè)連續(xù)的消息的間隔大于一分鐘,此函數(shù)將返回。
func longRunning(messages <-chan string) {
	for {
		select {
		case <-time.After(time.Minute):
			return
		case msg := <-messages:
			fmt.Println(msg)
		}
	}
}

為了避免太多的Timer值被創(chuàng)建,我們應(yīng)該只使用(并復(fù)用)一個(gè)Timer值,像下面這樣:

func longRunning(messages <-chan string) {
	timer := time.NewTimer(time.Minute)
	defer timer.Stop()

	for {
		select {
		case <-timer.C: // 過期了
			return
		case msg := <-messages:
			fmt.Println(msg)

			// 此if代碼塊很重要。
			if !timer.Stop() {
				<-timer.C
			}
		}

		// 必須重置以復(fù)用。
		timer.Reset(time.Minute)
	}
}

注意,此示例中的if代碼塊用來舍棄一個(gè)可能在執(zhí)行第二個(gè)分支代碼塊的時(shí)候發(fā)送過來的超時(shí)通知。

不正確地使用time.Timer值

一個(gè)典型的time.Timer的使用已經(jīng)在上一節(jié)中展示了。一些解釋:

  • 如果一個(gè)Timer值已經(jīng)過期或者已經(jīng)被終止(stopped),則相應(yīng)的Stop方法調(diào)用返回false。 在此Timer值尚未終止的時(shí)候,Stop方法調(diào)用返回false只能意味著此Timer值已經(jīng)過期。
  • 一個(gè)Timer值被終止之后,它的通道字段C最多只能含有一個(gè)過期的通知。
  • 在一個(gè)Timer終止(stopped)之后并且在重置和重用此Timer值之前,我們應(yīng)該確保此Timer值中肯定不存在過期的通知。 這就是上一節(jié)中的例子中的if代碼塊的意義所在。

一個(gè)*Timer值的Reset方法必須在對(duì)應(yīng)Timer值過期或者終止之后才能被調(diào)用; 否則,此Reset方法調(diào)用和一個(gè)可能的向此Timer值的C通道字段的發(fā)送通知操作產(chǎn)生數(shù)據(jù)競(jìng)爭(zhēng)。

如果上一節(jié)中的例子中的select流程控制代碼塊中的第一個(gè)分支被選中,則這表示相應(yīng)的Timer值已經(jīng)過期,所以我們不必終止它。 但是我們必須在第二個(gè)分支中通過終止此Timer以檢查此Timer中是否存在一個(gè)過期的通知。 如果確實(shí)有一個(gè)過期的通知,我們必須在重用這個(gè)Timer之前將此過期的通知取出;否則,此過期的通知將下一個(gè)循環(huán)步導(dǎo)致在第一個(gè)分支立即被選中。

比如,下面這個(gè)程序?qū)⒃谶\(yùn)行后大概一秒鐘(而不是十秒鐘)后退出。 而且此程序存在著潛在的數(shù)據(jù)競(jìng)爭(zhēng)。

package main

import (
	"fmt"
	"time"
)

func main() {
	start := time.Now()
	timer := time.NewTimer(time.Second/2)
	select {
	case <-timer.C:
	default:
		time.Sleep(time.Second) // 此分支被選中的可能性較大
	}
	timer.Reset(time.Second * 10) // 可能數(shù)據(jù)競(jìng)爭(zhēng)
	<-timer.C
	fmt.Println(time.Since(start)) // 大約1s
}

當(dāng)一個(gè)time.Timer值不再被使用后,我們不必(但是推薦)終止之。

在多個(gè)協(xié)程中使用同一個(gè)time.Timer值比較容易寫出不當(dāng)?shù)牟l(fā)代碼,所以盡量不要跨協(xié)程使用一個(gè)Timer值。

我們不應(yīng)該依賴于time.TimerReset方法的返回值。此返回值只要是為了歷史兼容性而存在的。


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)