在本文發(fā)表數(shù)日前,我曾寫了一篇文章來(lái)解釋通道的規(guī)則。 那篇文章在reddit和HN上獲得了很多點(diǎn)贊,但也有很多人對(duì)Go通道的細(xì)節(jié)設(shè)計(jì)提出了一些批評(píng)意見(jiàn)。
這些批評(píng)主要針對(duì)于通道設(shè)計(jì)中的下列細(xì)節(jié):
這些批評(píng)看上去有幾分道理(實(shí)際上屬于對(duì)通道的不正確使用導(dǎo)致的偏見(jiàn))。 是的,Go語(yǔ)言中并沒(méi)有提供一個(gè)內(nèi)置函數(shù)來(lái)檢查一個(gè)通道是否已經(jīng)關(guān)閉。
在Go中,如果我們能夠保證從不會(huì)向一個(gè)通道發(fā)送數(shù)據(jù),那么有一個(gè)簡(jiǎn)單的方法來(lái)判斷此通道是否已經(jīng)關(guān)閉。 此方法已經(jīng)在上一篇文章通道用例大全中展示過(guò)了。 這里為了本文的連貫性,在下面的例子中重新列出了此方法。
package main
import "fmt"
type T int
func IsClosed(ch <-chan T) bool {
select {
case <-ch:
return true
default:
}
return false
}
func main() {
c := make(chan T)
fmt.Println(IsClosed(c)) // false
close(c)
fmt.Println(IsClosed(c)) // true
}
如前所述,此方法并不是一個(gè)通用的檢查通道是否已經(jīng)關(guān)閉的方法。
事實(shí)上,即使有一個(gè)內(nèi)置closed
函數(shù)用來(lái)檢查一個(gè)通道是否已經(jīng)關(guān)閉,它的有用性也是十分有限的。 原因是當(dāng)此函數(shù)的一個(gè)調(diào)用的結(jié)果返回時(shí),被查詢的通道的狀態(tài)可能已經(jīng)又改變了,導(dǎo)致此調(diào)用結(jié)果并不能反映出被查詢的通道的最新?tīng)顟B(tài)。 雖然我們可以根據(jù)一個(gè)調(diào)用closed(ch)
的返回結(jié)果為true
而得出我們不應(yīng)該再向通道ch
發(fā)送數(shù)據(jù)的結(jié)論, 但是我們不能根據(jù)一個(gè)調(diào)用closed(ch)
的返回結(jié)果為false
而得出我們可以繼續(xù)向通道ch
發(fā)送數(shù)據(jù)的結(jié)論。
一個(gè)常用的使用Go通道的原則是不要在數(shù)據(jù)接收方或者在有多個(gè)發(fā)送者的情況下關(guān)閉通道。 換句話說(shuō),我們只應(yīng)該讓一個(gè)通道唯一的發(fā)送者關(guān)閉此通道。
下面我們將稱此原則為通道關(guān)閉原則。
當(dāng)然,這并不是一個(gè)通用的關(guān)閉通道的原則。通用的原則是不要關(guān)閉已關(guān)閉的通道。 如果我們能夠保證從某個(gè)時(shí)刻之后,再?zèng)]有協(xié)程將向一個(gè)未關(guān)閉的非nil通道發(fā)送數(shù)據(jù),則一個(gè)協(xié)程可以安全地關(guān)閉此通道。 然而,做出這樣的保證常常需要很大的努力,從而導(dǎo)致代碼過(guò)度復(fù)雜。 另一方面,遵循通道關(guān)閉原則是一件相對(duì)簡(jiǎn)單的事兒。
如果由于某種原因,你一定非要從數(shù)據(jù)接收方或者讓眾多發(fā)送者中的一個(gè)關(guān)閉一個(gè)通道,你可以使用恢復(fù)機(jī)制來(lái)防止可能產(chǎn)生的恐慌而導(dǎo)致程序崩潰。 下面就是這樣的一個(gè)實(shí)現(xiàn)(假設(shè)通道的元素類型為T
)。
func SafeClose(ch chan T) (justClosed bool) {
defer func() {
if recover() != nil {
// 一個(gè)函數(shù)的返回結(jié)果可以在defer調(diào)用中修改。
justClosed = false
}
}()
// 假設(shè)ch != nil。
close(ch) // 如果ch已關(guān)閉,則產(chǎn)生一個(gè)恐慌。
return true // <=> justClosed = true; return
}
此方法違反了通道關(guān)閉原則。
同樣的方法可以用來(lái)粗魯?shù)叵蛞粋€(gè)關(guān)閉狀態(tài)未知的通道發(fā)送數(shù)據(jù)。
func SafeSend(ch chan T, value T) (closed bool) {
defer func() {
if recover() != nil {
closed = true
}
}()
ch <- value // 如果ch已關(guān)閉,則產(chǎn)生一個(gè)恐慌。
return false // <=> closed = false; return
}
這樣的粗魯方法不僅違反了通道關(guān)閉原則,而且Go白皮書和標(biāo)準(zhǔn)編譯器不保證它的實(shí)現(xiàn)中不存在數(shù)據(jù)競(jìng)爭(zhēng)。
很多Go程序員喜歡使用sync.Once
來(lái)關(guān)閉通道。
type MyChannel struct {
C chan T
once sync.Once
}
func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan T)}
}
func (mc *MyChannel) SafeClose() {
mc.once.Do(func() {
close(mc.C)
})
}
當(dāng)然,我們也可以使用sync.Mutex
來(lái)防止多次關(guān)閉一個(gè)通道。
type MyChannel struct {
C chan T
closed bool
mutex sync.Mutex
}
func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan T)}
}
func (mc *MyChannel) SafeClose() {
mc.mutex.Lock()
defer mc.mutex.Unlock()
if !mc.closed {
close(mc.C)
mc.closed = true
}
}
func (mc *MyChannel) IsClosed() bool {
mc.mutex.Lock()
defer mc.mutex.Unlock()
return mc.closed
}
這些實(shí)現(xiàn)確實(shí)比上一節(jié)中的方法禮貌一些,但是它們不能完全有效地避免數(shù)據(jù)競(jìng)爭(zhēng)。 目前的Go白皮書并不保證發(fā)生在一個(gè)通道上的并發(fā)關(guān)閉操作和發(fā)送操作不會(huì)產(chǎn)生數(shù)據(jù)競(jìng)爭(zhēng)。 如果一個(gè)SafeClose
函數(shù)和同一個(gè)通道上的發(fā)送操作同時(shí)運(yùn)行,則數(shù)據(jù)競(jìng)爭(zhēng)可能發(fā)生(雖然這樣的數(shù)據(jù)競(jìng)爭(zhēng)一般并不會(huì)帶來(lái)什么危害)。
上一節(jié)中介紹的SafeSend
函數(shù)有一個(gè)弊端,它的調(diào)用不能做為case
操作而被使用在select
代碼塊中。 另外,很多Go程序員(包括我)認(rèn)為上面兩節(jié)展示的關(guān)閉通道的方法不是很優(yōu)雅。 本節(jié)下面將介紹一些在各種情形下使用純通道操作來(lái)關(guān)閉通道的方法。
(為了演示程序的完整性,下面這些例子中使用到了sync.WaitGroup
。在實(shí)踐中,sync.WaitGroup
并不是必需的。)
這是最簡(jiǎn)單的一種情形。當(dāng)發(fā)送者欲結(jié)束發(fā)送,讓它關(guān)閉用來(lái)傳輸數(shù)據(jù)的通道即可。
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 100000
const NumReceivers = 100
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int)
// 發(fā)送者
go func() {
for {
if value := rand.Intn(Max); value == 0 {
// 此唯一的發(fā)送者可以安全地關(guān)閉此數(shù)據(jù)通道。
close(dataCh)
return
} else {
dataCh <- value
}
}
}()
// 接收者
for i := 0; i < NumReceivers; i++ {
go func() {
defer wgReceivers.Done()
// 接收數(shù)據(jù)直到通道dataCh已關(guān)閉
// 并且dataCh的緩沖隊(duì)列已空。
for value := range dataCh {
log.Println(value)
}
}()
}
wgReceivers.Wait()
}
此情形比上一種情形復(fù)雜一些。我們不能讓接收者關(guān)閉用來(lái)傳輸數(shù)據(jù)的通道來(lái)停止數(shù)據(jù)傳輸,因?yàn)檫@樣做違反了通道關(guān)閉原則。 但是我們可以讓接收者關(guān)閉一個(gè)額外的信號(hào)通道來(lái)通知發(fā)送者不要再發(fā)送數(shù)據(jù)了。
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 100000
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(1)
// ...
dataCh := make(chan int)
stopCh := make(chan struct{})
// stopCh是一個(gè)額外的信號(hào)通道。它的
// 發(fā)送者為dataCh數(shù)據(jù)通道的接收者。
// 它的接收者為dataCh數(shù)據(jù)通道的發(fā)送者。
// 發(fā)送者
for i := 0; i < NumSenders; i++ {
go func() {
for {
// 這里的第一個(gè)嘗試接收用來(lái)讓此發(fā)送者
// 協(xié)程盡早地退出。對(duì)于這個(gè)特定的例子,
// 此select代碼塊并非必需。
select {
case <- stopCh:
return
default:
}
// 即使stopCh已經(jīng)關(guān)閉,此第二個(gè)select
// 代碼塊中的第一個(gè)分支仍很有可能在若干個(gè)
// 循環(huán)步內(nèi)依然不會(huì)被選中。如果這是不可接受
// 的,則上面的第一個(gè)select代碼塊是必需的。
select {
case <- stopCh:
return
case dataCh <- rand.Intn(Max):
}
}
}()
}
// 接收者
go func() {
defer wgReceivers.Done()
for value := range dataCh {
if value == Max-1 {
// 此唯一的接收者同時(shí)也是stopCh通道的
// 唯一發(fā)送者。盡管它不能安全地關(guān)閉dataCh數(shù)
// 據(jù)通道,但它可以安全地關(guān)閉stopCh通道。
close(stopCh)
return
}
log.Println(value)
}
}()
// ...
wgReceivers.Wait()
}
如此例中的注釋所述,對(duì)于此額外的信號(hào)通道stopCh
,它只有一個(gè)發(fā)送者,即dataCh
數(shù)據(jù)通道的唯一接收者。 dataCh
數(shù)據(jù)通道的接收者關(guān)閉了信號(hào)通道stopCh
,這是不違反通道關(guān)閉原則的。
在此例中,數(shù)據(jù)通道dataCh
并沒(méi)有被關(guān)閉。是的,我們不必關(guān)閉它。 當(dāng)一個(gè)通道不再被任何協(xié)程所使用后,它將逐漸被垃圾回收掉,無(wú)論它是否已經(jīng)被關(guān)閉。 所以這里的優(yōu)雅性體現(xiàn)在通過(guò)不關(guān)閉一個(gè)通道來(lái)停止使用此通道。
這是最復(fù)雜的一種情形。我們不能讓接收者和發(fā)送者中的任何一個(gè)關(guān)閉用來(lái)傳輸數(shù)據(jù)的通道,我們也不能讓多個(gè)接收者之一關(guān)閉一個(gè)額外的信號(hào)通道。 這兩種做法都違反了通道關(guān)閉原則。 然而,我們可以引入一個(gè)中間調(diào)解者角色并讓其關(guān)閉額外的信號(hào)通道來(lái)通知所有的接收者和發(fā)送者結(jié)束工作。 具體實(shí)現(xiàn)見(jiàn)下例。注意其中使用了一個(gè)嘗試發(fā)送操作來(lái)向中間調(diào)解者發(fā)送信號(hào)。
package main
import (
"time"
"math/rand"
"sync"
"log"
"strconv"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 100000
const NumReceivers = 10
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int)
stopCh := make(chan struct{})
// stopCh是一個(gè)額外的信號(hào)通道。它的發(fā)送
// 者為中間調(diào)解者。它的接收者為dataCh
// 數(shù)據(jù)通道的所有的發(fā)送者和接收者。
toStop := make(chan string, 1)
// toStop是一個(gè)用來(lái)通知中間調(diào)解者讓其
// 關(guān)閉信號(hào)通道stopCh的第二個(gè)信號(hào)通道。
// 此第二個(gè)信號(hào)通道的發(fā)送者為dataCh數(shù)據(jù)
// 通道的所有的發(fā)送者和接收者,它的接收者
// 為中間調(diào)解者。它必須為一個(gè)緩沖通道。
var stoppedBy string
// 中間調(diào)解者
go func() {
stoppedBy = <-toStop
close(stopCh)
}()
// 發(fā)送者
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
// 為了防止阻塞,這里使用了一個(gè)嘗試
// 發(fā)送操作來(lái)向中間調(diào)解者發(fā)送信號(hào)。
select {
case toStop <- "發(fā)送者#" + id:
default:
}
return
}
// 此處的嘗試接收操作是為了讓此發(fā)送協(xié)程盡早
// 退出。標(biāo)準(zhǔn)編譯器對(duì)嘗試接收和嘗試發(fā)送做了
// 特殊的優(yōu)化,因而它們的速度很快。
select {
case <- stopCh:
return
default:
}
// 即使stopCh已關(guān)閉,如果這個(gè)select代碼塊
// 中第二個(gè)分支的發(fā)送操作是非阻塞的,則第一個(gè)
// 分支仍很有可能在若干個(gè)循環(huán)步內(nèi)依然不會(huì)被選
// 中。如果這是不可接受的,則上面的第一個(gè)嘗試
// 接收操作代碼塊是必需的。
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// 接收者
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wgReceivers.Done()
for {
// 和發(fā)送者協(xié)程一樣,此處的嘗試接收操作是為了
// 讓此接收協(xié)程盡早退出。
select {
case <- stopCh:
return
default:
}
// 即使stopCh已關(guān)閉,如果這個(gè)select代碼塊
// 中第二個(gè)分支的接收操作是非阻塞的,則第一個(gè)
// 分支仍很有可能在若干個(gè)循環(huán)步內(nèi)依然不會(huì)被選
// 中。如果這是不可接受的,則上面嘗試接收操作
// 代碼塊是必需的。
select {
case <- stopCh:
return
case value := <-dataCh:
if value == Max-1 {
// 為了防止阻塞,這里使用了一個(gè)嘗試
// 發(fā)送操作來(lái)向中間調(diào)解者發(fā)送信號(hào)。
select {
case toStop <- "接收者#" + id:
default:
}
return
}
log.Println(value)
}
}
}(strconv.Itoa(i))
}
// ...
wgReceivers.Wait()
log.Println("被" + stoppedBy + "終止了")
}
在此例中,通道關(guān)閉原則依舊得到了遵守。
請(qǐng)注意,信號(hào)通道toStop
的容量必須至少為1。 如果它的容量為0,則在中間調(diào)解者還未準(zhǔn)備好的情況下就已經(jīng)有某個(gè)協(xié)程向toStop
發(fā)送信號(hào)時(shí),此信號(hào)將被拋棄。
我們也可以不使用嘗試發(fā)送操作向中間調(diào)解者發(fā)送信號(hào),但信號(hào)通道toStop
的容量必須至少為數(shù)據(jù)發(fā)送者和數(shù)據(jù)接收者的數(shù)量之和,以防止向其發(fā)送數(shù)據(jù)時(shí)(有一個(gè)極其微小的可能)導(dǎo)致某些發(fā)送者和接收者協(xié)程永久阻塞。
...
toStop := make(chan string, NumReceivers + NumSenders)
...
value := rand.Intn(Max)
if value == 0 {
toStop <- "sender#" + id
return
}
...
if value == Max-1 {
toStop <- "receiver#" + id
return
}
...
有時(shí),數(shù)據(jù)通道(dataCh
)的關(guān)閉請(qǐng)求需要由某個(gè)第三方協(xié)程發(fā)出。對(duì)于這種情形,我們可以使用一個(gè)額外的信號(hào)通道來(lái)通知唯一的發(fā)送者關(guān)閉數(shù)據(jù)通道(dataCh
)。
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 100000
const NumReceivers = 100
const NumThirdParties = 15
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int)
closing := make(chan struct{}) // 信號(hào)通道
closed := make(chan struct{})
// 此stop函數(shù)可以被安全地多次調(diào)用。
stop := func() {
select {
case closing<-struct{}{}:
<-closed
case <-closed:
}
}
// 一些第三方協(xié)程
for i := 0; i < NumThirdParties; i++ {
go func() {
r := 1 + rand.Intn(3)
time.Sleep(time.Duration(r) * time.Second)
stop()
}()
}
// 發(fā)送者
go func() {
defer func() {
close(closed)
close(dataCh)
}()
for {
select{
case <-closing: return
default:
}
select{
case <-closing: return
case dataCh <- rand.Intn(Max):
}
}
}()
// 接收者
for i := 0; i < NumReceivers; i++ {
go func() {
defer wgReceivers.Done()
for value := range dataCh {
log.Println(value)
}
}()
}
wgReceivers.Wait()
}
上述代碼中的stop
函數(shù)中使用的技巧偷自Roger Peppe在此貼中的一個(gè)留言。
在上面的提到的“N個(gè)發(fā)送者”情形中,為了遵守通道關(guān)閉原則,我們避免了關(guān)閉數(shù)據(jù)通道(dataCh
)。 但是有時(shí)候,數(shù)據(jù)通道(dataCh
)必須被關(guān)閉以通知各個(gè)接收者數(shù)據(jù)發(fā)送已經(jīng)結(jié)束。 對(duì)于這種“N個(gè)發(fā)送者”情形,我們可以使用一個(gè)中間通道將它們轉(zhuǎn)化為“一個(gè)發(fā)送者”情形,然后繼續(xù)使用上一節(jié)介紹的技巧來(lái)關(guān)閉此中間通道,從而避免了關(guān)閉原始的dataCh
數(shù)據(jù)通道。
package main
import (
"time"
"math/rand"
"sync"
"log"
"strconv"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 1000000
const NumReceivers = 10
const NumSenders = 1000
const NumThirdParties = 15
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int) // 將被關(guān)閉
middleCh := make(chan int) // 不會(huì)被關(guān)閉
closing := make(chan string)
closed := make(chan struct{})
var stoppedBy string
stop := func(by string) {
select {
case closing <- by:
<-closed
case <-closed:
}
}
// 中間層
go func() {
exit := func(v int, needSend bool) {
close(closed)
if needSend {
dataCh <- v
}
close(dataCh)
}
for {
select {
case stoppedBy = <-closing:
exit(0, false)
return
case v := <- middleCh:
select {
case stoppedBy = <-closing:
exit(v, true)
return
case dataCh <- v:
}
}
}
}()
// 一些第三方協(xié)程
for i := 0; i < NumThirdParties; i++ {
go func(id string) {
r := 1 + rand.Intn(3)
time.Sleep(time.Duration(r) * time.Second)
stop("3rd-party#" + id)
}(strconv.Itoa(i))
}
// 發(fā)送者
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
stop("sender#" + id)
return
}
select {
case <- closed:
return
default:
}
select {
case <- closed:
return
case middleCh <- value:
}
}
}(strconv.Itoa(i))
}
// 接收者
for range [NumReceivers]struct{}{} {
go func() {
defer wgReceivers.Done()
for value := range dataCh {
log.Println(value)
}
}()
}
// ...
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}
在日常編程中可能會(huì)遇到更多的變種情形,但是上面介紹的情形是最常見(jiàn)和最基本的。 通過(guò)聰明地使用通道(和其它并發(fā)同步技術(shù)),我相信,對(duì)于各種變種,我們總會(huì)找到相應(yīng)的遵守通道關(guān)閉原則的解決方法。
并沒(méi)有什么情況非得逼得我們違反通道關(guān)閉原則。 如果你遇到了此情形,請(qǐng)考慮修改你的代碼流程和結(jié)構(gòu)設(shè)計(jì)。
使用通道編程宛如在藝術(shù)創(chuàng)作一般!
更多建議: