由淺入深剖析 go channel

channel 是 golang 中最核心的 feature 之一,因此理解 Channel 的原理對於學習和使用 golang 非常重要。

channel 是 goroutine 之間通信的一種方式,可以類比成 Unix 中的進程的通信方式管道。

由淺入深剖析 go channel

CSP 模型

在講 channel 之前,有必要先提一下 CSP 模型,傳統的併發模型主要分為 Actor 模型和 CSP 模型,CSP 模型全稱為 communicating sequential processes,CSP 模型由併發執行實體(進程,線程或協程),和消息通道組成,實體之間通過消息通道發送消息進行通信。和 Actor 模型不同,CSP 模型關注的是消息發送的載體,即通道,而不是發送消息的執行實體。關於 CSP 模型的更進一步的介紹,有興趣的同學可以閱讀論文 Communicating Sequential Processes,Go 語言的併發模型參考了 CSP 理論,其中執行實體對應的是 goroutine, 消息通道對應的就是 channel。

channel 介紹

channel 提供了一種通信機制,通過它,一個 goroutine 可以想另一 goroutine 發送消息。channel 本身還需關聯了一個類型,也就是 channel 可以發送數據的類型。例如: 發送 int 類型消息的 channel 寫作 chan int 。

channel 創建

channel 使用內置的 make 函數創建,下面聲明瞭一個 chan int 類型的 channel:

ch := make(chan int)

c和 map 類似,make 創建了一個底層數據結構的引用,當賦值或參數傳遞時,只是拷貝了一個 channel 引用,指向相同的 channel 對象。和其他引用類型一樣,channel 的空值為 nil 。使用 == 可以對類型相同的 channel 進行比較,只有指向相同對象或同為 nil 時,才返回 true


channel 的讀寫操作

ch := make(chan int)

// write to channel
ch
// read from channel
x
// another way to read
x =

channel 一定要初始化後才能進行讀寫操作,否則會永久阻塞。

關閉 channel

golang 提供了內置的 close 函數對 channel 進行關閉操作。

ch := make(chan int)

close(ch)

有關 channel 的關閉,你需要注意以下事項:

  • 關閉一個未初始化(nil) 的 channel 會產生 panic
  • 重複關閉同一個 channel 會產生 panic
  • 向一個已關閉的 channel 中發送消息會產生 panic
  • 從已關閉的 channel 讀取消息不會產生 panic,且能讀出 channel 中還未被讀取的消息,若消息均已讀出,則會讀到類型的零值。從一個已關閉的 channel 中讀取消息永遠不會阻塞,並且會返回一個為 false 的 ok-idiom,可以用它來判斷 channel 是否關閉
  • 關閉 channel 會產生一個廣播機制,所有向 channel 讀取消息的 goroutine 都會收到消息
ch := make(chan int, 10)
ch ch
close(ch)

for x := range ch {
fmt.Println(x)
}

x, ok := fmt.Println(x, ok)


-----
output:

11
12
0 false

channel 的類型

channel 分為不帶緩存的 channel 和帶緩存的 channel。

無緩存的 channel

從無緩存的 channel 中讀取消息會阻塞,直到有 goroutine 向該 channel 中發送消息;同理,向無緩存的 channel 中發送消息也會阻塞,直到有 goroutine 從 channel 中讀取消息。

通過無緩存的 channel 進行通信時,接收者收到數據 happens before 發送者 goroutine 喚醒

有緩存的 channel

有緩存的 channel 的聲明方式為指定 make 函數的第二個參數,該參數為 channel 緩存的容量

ch := make(chan int, 10)

有緩存的 channel 類似一個阻塞隊列(採用環形數組實現)。當緩存未滿時,向 channel 中發送消息時不會阻塞,當緩存滿時,發送操作將被阻塞,直到有其他 goroutine 從中讀取消息;相應的,當 channel 中消息不為空時,讀取消息不會出現阻塞,當 channel 為空時,讀取操作會造成阻塞,直到有 goroutine 向 channel 中寫入消息。

ch := make(chan int, 3)

// blocked, read from empty buffered channel
ch := make(chan int, 3)
ch ch ch
// blocked, send to full buffered channel
ch

通過 len 函數可以獲得 chan 中的元素個數,通過 cap 函數可以得到 channel 的緩存長度。

channel 的用法

goroutine 通信

看一個 effective go 中的例子:

c := make(chan int) // Allocate a channel.

// Start the sort in a goroutine; when it completes, signal on the channel.
go func() {
list.Sort()
c }()

doSomethingForAWhile()

主 goroutine 會阻塞,直到執行 sort 的 goroutine 完成。

range 遍歷

channel 也可以使用 range 取值,並且會一直從 channel 中讀取數據,直到有 goroutine 對改 channel 執行 close 操作,循環才會結束。

// consumer worker
ch := make(chan int, 10)
for x := range ch{
fmt.Println(x)
}

等價於

for {
x, ok := if !ok {
break
}

fmt.Println(x)
}

配合 select 使用

select 用法類似與 IO 多路複用,可以同時監聽多個 channel 的消息狀態,看下面的例子

select {
case ...
case ...
case ch3 ...
default:
...
}
  • select 可以同時監聽多個 channel 的寫入或讀取
  • 執行 select 時,若只有一個 case 通過(不阻塞),則執行這個 case 塊
  • 若有多個 case 通過,則隨機挑選一個 case 執行
  • 若所有 case 均阻塞,且定義了 default 模塊,則執行 default 模塊。若未定義 default 模塊,則 select 語句阻塞,直到有 case 被喚醒。
  • 使用 break 會跳出 select 塊。

1. 設置超時時間

ch := make(chan struct{})

// finish task while send msg to ch
go doTask(ch)

timeout := time.After(5 * time.Second)
select {
case fmt.Println("task finished.")
case fmt.Println("task timeout.")
}

2. quite channel

有一些場景中,一些 worker goroutine 需要一直循環處理信息,直到收到 quit 信號

msgCh := make(chan struct{})
quitCh := make(chan struct{})
for {
select {
case doWork()
case finish()
return
}

單向 channel

即只可寫入或只可讀的channel,事實上 channel 只讀或只寫都沒有意義,所謂的單向 channel 其實知識聲明時用,比如

func foo(ch chan

chan

channel 源碼分析

channel 的主要實現在 src/runtime/chan.go 中,以下源碼均基於 go1.9.2。源碼閱讀時為了更好的理解 channel 特性,幫助正確合理的使用 channel,閱讀代碼的過程可以回憶前面章節的 channel 特性。

channel 類結構

channel 相關類定義如下:

// channel 類型定義
type hchan struct {
// channel 中的元素數量, len
qcount uint // total data in the queue

// channel 的大小, cap
dataqsiz uint // size of the circular queue

// channel 的緩衝區,環形數組實現
buf unsafe.Pointer // points to an array of dataqsiz elements

// 單個元素的大小
elemsize uint16

// closed 標誌位
closed uint32

// 元素的類型

elemtype *_type // element type

// send 和 recieve 的索引,用於實現環形數組隊列
sendx uint // send index
recvx uint // receive index

// recv goroutine 等待隊列
recvq waitq // list of recv waiters

// send goroutine 等待隊列
sendq waitq // list of send waiters

// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}

// 等待隊列的鏈表實現
type waitq struct {
first *sudog
last *sudog
}

// in src/runtime/runtime2.go
// 對 G 的封裝
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.

g *g
selectdone *uint32 // CAS to 1 to win select race (may point to stack)
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)

// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.

acquiretime int64
releasetime int64

ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}

可以看到,channel 的主要組成有:一個環形數組實現的隊列,用於存儲消息元素;兩個鏈表實現的 goroutine 等待隊列,用於存儲阻塞在 recv 和 send 操作上的 goroutine;一個互斥鎖,用於各個屬性變動的同步

channel make 實現

func makechan(t *chantype, size int64) *hchan {
elem := t.elem

// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {
panic(plainError("makechan: size out of range"))
}

var c *hchan

if elem.kind&kindNoPointers != 0 || size == 0 {
// case 1: channel 不含有指針
// case 2: size == 0,即無緩衝 channel
// Allocate memory in one call.
// Hchan does not contain pointers interesting for GC in this case:
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.

// 在堆上分配連續的空間用作 channel

c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
// race detector uses this location for synchronization
// Also prevents us from pointing beyond the allocation (see issue 9401).
c.buf = unsafe.Pointer(c)
}
} else {
// 有緩衝 channel 初始化
c = new(hchan)
// 堆上分配 buf 內存
c.buf = newarray(elem, int(size))
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)

if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\\n")
}
return c
}

make 的過程還比較簡單,需要注意一點的是當元素不含指針的時候,會將整個 hchan 分配成一個連續的空間。

channel send

// entry point for c //go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc(unsafe.Pointer(&c)))
}

/*
* generic single channel send/recv
* If block is not nil,
* then the protocol will not
* sleep but return if it could
* not complete.
*
* sleep can wake up with g.param == nil
* when a channel involved in the sleep has

* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

// 前面章節說道的,當 channel 未初始化或為 nil 時,向其中發送數據將會永久阻塞
if c == nil {
if !block {
return false
}

// gopark 會使當前 goroutine 休眠,並通過 unlockf 喚醒,但是此時傳入的 unlockf 為 nil, 因此,goroutine 會一直休眠
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}

if debugChan {
print("chansend: chan=", c, "\\n")
}

if raceenabled {
racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}

var t0 int64
if blockprofilerate > 0 {

t0 = cputicks()
}

// 獲取同步鎖
lock(&c.lock)

// 之前章節提過,向已經關閉的 channel 發送消息會產生 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

// CASE1: 當有 goroutine 在 recv 隊列上等待時,跳過緩存隊列,將消息直接發給 reciever goroutine
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

// CASE2: 緩存隊列未滿,則將消息複製到緩存隊列上
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

if !block {
unlock(&c.lock)
return false
}

// CASE3: 緩存隊列已滿,將goroutine 加入 send 隊列

// 初始化 sudog
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 加入隊列
c.sendq.enqueue(mysg)
// 休眠
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

// 喚醒 goroutine
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}

從 send 代碼中可以看到,之前章節提到的一些特性都在代碼中有所體現,

send 有以下幾種情況:

  • 有 goroutine 阻塞在 channel recv 隊列上,此時緩存隊列為空,直接將消息發送給 reciever goroutine,只產生一次複製
  • 當 channel 緩存隊列有剩餘空間時,將數據放到隊列裡,等待接收,接收後總共產生兩次複製
  • 當 channel 緩存隊列已滿時,將當前 goroutine 加入 send 隊列並阻塞。

channel recieve

// entry points for //go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.

if debugChan {
print("chanrecv: chan=", c, "\\n")
}

// 從 nil 的 channel 中接收消息,永久阻塞
if c == nil {

if !block {
return
}
gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not ready for receiving, we observe that the
// channel is not closed. Each of these observations is a single word-sized read
// (first c.sendq.first or c.qcount, and second c.closed).
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// The order of operations is important here: reversing the operations can lead to
// incorrect behavior when racing with a close.
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock)

// CASE1: 從已經 close 且為空的 channel recv 數據,返回空值
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}

// CASE2: send 隊列不為空
// CASE2.1: 緩存隊列為空,直接從 sender recv 元素
// CASE2.2: 緩存隊列不為空,此時只有可能是緩存隊列已滿,從隊列頭取出元素,並喚醒 sender 將元素寫入緩存隊列尾部。由於為環形隊列,因此,隊列滿時只需要將隊列頭複製給 reciever,同時將 sender 元素複製到該位置,並移動隊列頭尾索引,不需要移動隊列元素

if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}

// CASE3: 緩存隊列不為空,直接從隊列取元素,移動頭索引
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

if !block {
unlock(&c.lock)
return false, false
}

// CASE4: 緩存隊列為空,將 goroutine 加入 recv 隊列,並阻塞
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil

gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}

channel close

func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}

lock(&c.lock)

// 重複 close,產生 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

if raceenabled {
callerpc := getcallerpc(unsafe.Pointer(&c))
racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))
racerelease(unsafe.Pointer(c))
}

c.closed = 1

var glist *g


// 喚醒所有 reciever
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
gp.schedlink.set(glist)
glist = gp
}

// 喚醒所有 sender,併產生 panic
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
gp.schedlink.set(glist)
glist = gp
}
unlock(&c.lock)

// Ready all Gs now that we've dropped the channel lock.
for glist != nil {
gp := glist
glist = glist.schedlink.ptr()

gp.schedlink = 0
goready(gp, 3)
}
}

好的,就分享到這裡了。


分享到:


相關文章: