用过 go 的都知道 channel,无需多言,直接开整!
1 中心数据结构
1.1 hchan
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
hchan: channel 数据结构
(1)qcount:当时 channel 中存在多少个元素;
(2)dataqsize: 当时 channel 能存放的元素容量;
(3)buf:channel 中用于存放元素的环形缓冲区;
(4)elemsize:channel 元素类型的巨细;
(5)closed:标识 channel 是否封闭;
(6)elemtype:channel 元素类型;
(7)sendx:发送元素进入环形缓冲区的 index;
(8)recvx:接纳元素所在的环形缓冲区的 index;
(9)recvq:因接纳而堕入堵塞的协程行列;
(10)sendq:因发送而堕入堵塞的协程行列;
1.2 waitq
type waitq struct {
first *sudog
last *sudog
}
waitq:堵塞的协程行列
(1)first:行列头部
(2)last:行列尾部
1.3 sudog
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// ...
isSelect bool
c *hchan
}
sudog:用于包装协程的节点
(1)g:goroutine,协程;
(2)next:行列中的下一个节点;
(3)prev:行列中的前一个节点;
(4)elem: 读取/写入 channel 的数据的容器;
(5)isSelect:标识当时协程是否处在 select 多路复用的流程中;
(6)c:标识与当时 sudog 交互的 chan.
2 结构器函数
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// ...
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
return
}
(1)判别请求内存空间巨细是否越界,mem 巨细为 element 类型巨细与 element 个数相乘后得到,仅当无缓冲型 channel 时,因个数为 0 导致巨细为 0;
(2)依据类型,初始 channel,分为 无缓冲型、有缓冲元素为 struct 型、有缓冲元素为 pointer 型 channel;
(3)假使为无缓冲型,则仅请求一个巨细为默认值 96 的空间;
(4)如若有缓冲的 struct 型,则一次性分配好 96 + mem 巨细的空间,而且调整 chan 的 buf 指向 mem 的开始方位;
(5)假使为有缓冲的 pointer 型,则分别请求 chan 和 buf 的空间,两者无需接连;
(6)对 channel 的其他字段进行初始化,包括元素类型巨细、元素类型、容量以及锁的初始化.
3 写流程
3.1 两类反常情况处理
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
(1)关于未初始化的 chan,写入操作会引发死锁;
(2)关于已封闭的 chan,写入操作会引发 panic.
3.2 case1:写时存在堵塞读协程
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
lock(&c.lock)
// ...
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// ..
(1)加锁;
(2)从堵塞度协程行列中取出一个 goroutine 的封装目标 sudog;
(3)在 send 办法中,会基于 memmove 办法,直接将元素拷贝交给 sudog 对应的 goroutine;
(4)在 send 办法中会完结解锁动作.
3.3 case2:写时无堵塞读协程但环形缓冲区仍有空间
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
lock(&c.lock)
// ...
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// ...
}
(1)加锁;
(2)将当时元素添加到环形缓冲区 sendx 对应的方位;
(3)sendx++;
(4)qcount++;
(4)解锁,回来.
3.4 case3:写时无堵塞读协程且环形缓冲区无空间
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
lock(&c.lock)
// ...
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
mysg.c = c
gp.waiting = mysg
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
gp.waiting = nil
closed := !mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true
}
(1)加锁;
(2)结构封装当时 goroutine 的 sudog 目标;
(3)完结指针指向,树立 sudog、goroutine、channel 之间的指向关系;
(4)把 sudog 添加到当时 channel 的堵塞写协程行列中;
(5)park 当时协程;
(6)假使协程从 park 中被唤醒,则收回 sudog(sudog能被唤醒,其对应的元素必然已经被读协程取走);
(7)解锁,回来
3.5 写流程全体串联
4 读流程
4.1 反常 case1:读空 channel
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// ...
}
(1)park 挂起,引起死锁;
4.2 反常 case2:channel 已封闭且内部无元素
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// The channel has been closed, but the channel's buffer have data.
}
// ...
(1)直接解锁回来即可
4.3 case3:读时有堵塞的写协程
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
// Just found waiting sender with not closed.
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
(1)加锁;
(2)从堵塞写协程行列中获取到一个写协程;
(3)假使 channel 无缓冲区,则直接读取写协程元素,并唤醒写协程;
(4)假使 channel 有缓冲区,则读取缓冲区头部元素,并将写协程元素写入缓冲区尾部后唤醒写写成;
(5)解锁,回来.
4.4 case4:读时无堵塞写协程且缓冲区有元素
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
(1)加锁;
(2)获取到 recvx 对应方位的元素;
(3)recvx++
(4)qcount–
(5)解锁,回来
4.5 case5:读时无堵塞写协程且缓冲区无元素
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
lock(&c.lock)
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
gp.waiting = mysg
mysg.g = gp
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
gp.waiting = nil
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
(1)加锁;
(2)结构封装当时 goroutine 的 sudog 目标;
(3)完结指针指向,树立 sudog、goroutine、channel 之间的指向关系;
(4)把 sudog 添加到当时 channel 的堵塞读协程行列中;
(5)park 当时协程;
(6)假使协程从 park 中被唤醒,则收回 sudog(sudog能被唤醒,其对应的元素必然已经被写入);
(7)解锁,回来
4.6 读流程全体串联
5 堵塞与非堵塞形式
在上述源码剖析流程中,均是以堵塞形式为主线进行讲述,忽略非堵塞形式的有关处理逻辑.
此处说明两个问题:
(1)非堵塞形式下,流程逻辑有何差异?
(2)何时会进入非堵塞形式?
5.1 非堵塞形式逻辑差异
非堵塞形式下,读/写 channel 办法经过一个 bool 型的响应参数,用以标识是否读取/写入成功.
(1)一切需要使得当时 goroutine 被挂起的操作,在非堵塞形式下都会回来 false;
(2)一切是的当时 goroutine 会进入死锁的操作,在非堵塞形式下都会回来 false;
(3)一切能当即完结读取/写入操作的条件下,非堵塞形式下会回来 true.
5.2 何时进入非堵塞形式
默认情况下,读/写 channel 都是堵塞形式,只要在 select 句子组成的多路复用分支中,与 channel 的交互会变成非堵塞形式:
ch := make(chan int)
select{
case <- ch:
default:
}
5.3 代码一览
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}
在 select 句子包裹的多路复用分支中,读和写 channel 操作会被汇编为 selectnbrecv 和 selectnbsend 办法,底层同样复用 chanrecv 和 chansend 办法,但此刻由于第三个入参 block 被设置为 false,导致后续会走进非堵塞的处理分支.
6 两种读 channel 的协议
读取 channel 时,能够依据第二个 bool 型的回来值用以判别当时 channel 是否已处于封闭状态:
ch := make(chan int, 2)
got1 := <- ch
got2,ok := <- ch
实现上述功能的原因是,两种格式下,读 channel 操作会被汇编成不同的办法:
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
7 封闭
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
(1)封闭未初始化过的 channel 会 panic;
(2)加锁;
(3)重复封闭 channel 会 panic;
(4)将堵塞读协程行列中的协程节点一致添加到 glist;
(5)将堵塞写协程行列中的协程节点一致添加到 glist;
(6)唤醒 glist 当中的一切协程.