Go 源码之 chan
一、总结
chan 提供了一种在 goroutine 之间进行数据交换和同步的方法。通道能够用于操控并发拜访和同享数据,从而削减竞态条件和死锁问题,而且能够自然地处理异步事情和信号。假如你的应用程序需求在 goroutine 之间传递数据或消息,那么通道是一个不错的挑选
- 内部是一个 hchan 结构(字段见源码),环形行列 + 发送者双向链表 + 接纳者双向链表 + 锁
- channel 与 select 语句结合运用时,底层调用的还是 chansend 和 chanrecv 函数
- channel
- 结构:环形缓存、sendq、recvq;
- 流程:上锁/解锁,堵塞/非堵塞,缓冲/非缓冲,缓存入队出队,sudog 入队出队,协程休眠/唤醒
二、源码
/src/runtime/chan.go
- 一个环形行列
- 两个双向列表
(一)hchan
buf + sendx + recvx 构成环形行列
type hchan struct {
qcount uint // 行列中现存元素数量
dataqsiz uint // 行列容量(缓冲区)
buf unsafe.Pointer // 行列,指向一个动态分配的数组,用于存储 channel 中的元素
elemsize uint16 // 行列中元素巨细
closed uint32 // 0 正常 ,1 封闭
elemtype *_type // 行列中元素类型,
sendx uint // 行列(buf)已发送方位,当(sendx++)==dataqsiz,则从头开始发,sendx=0
recvx uint // 行列(buf)已接纳方位;
// 当 `sendx` 和 `recvx` 相等时,channel 中无元素,发送 / 接纳 操作堵塞
recvq waitq // 双向链表 ,FIFO 由 recv 行为(也便是读 <-ch)堵塞在 channel 上的 goroutine 行列
sendq waitq // 双向链表 ,FIFO 由 send 行为 (也便是写 ch<-) 堵塞在 channel 上的 goroutine 行列
lock mutex // 读写锁,维护hchan中的一切字段,以及waitq中一切的字段
}
// 双向链表,存储了g
type waitq struct {
first *sudog // 链表头部,协程 g 的数据结构
last *sudog // 链表尾部,协程 g 的数据结构
}
(二)创立
ch1 := make(chan int) ch2 := make(chan int,2)
底层都是调用了runtime.makechan()
- 合法性校验
- 数据类型巨细校验
- 内存溢出校验
- 初始化 hchan
- 初始化 无缓冲 hchan
- 初始化 有缓冲 && 无指针元素 hchan
- 初始化 无缓冲 && 有指针元素 hchan
- 初始化 hchan 其他元素:如 dataqsize、elemsize、elemtype、lock
// 首要逻辑:合法性验证 和 分配地址空间
// t 是指向 chantype 的指针,size 表明缓冲区巨细,0表无缓冲
func makechan(t *chantype, size int) *hchan {
elem := t.elem // 元素的类型
// ----------- 1. 合法性验证 ----------
// 数据类型巨细验证,大于1<<16时反常
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 内存对齐(下降寻址次数),大于最大内存(8字节数)时反常
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 传入的size大于堆可分配的最大内存时:内存溢出反常
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// ----------- 2. 分配地址空间 ----------
// hchanSize 为 hchan 结构巨细
// mem 为缓存区巨细
/* 依据 channel 中收发元素的类型和缓冲区的巨细初始化 runtime.hchan 和 缓冲区,分为三种状况:
* 假如不存在缓冲区,分配 hchan 结构体空间,即无缓存 channel
* 假如 channel 存储的类型不是指针类型,分配接连地址空间,包含 hchan 结构体 + 数据
* 默认状况包含指针,为 hchan 和 buf 独自分配数据地址空间
更新 hchan 结构体的数据,包含 elemsize、elemtype 和 dataqsiz
*/
var c *hchan
switch {
case mem == 0:
// 创立无缓冲的 chan ,buf==0 ,初始化 hchan
c = (*hchan)(mallocgc(hchanSize, nil, true)) // hchanSize表明空hchan需求占用的字节
c.buf = c.raceaddr() // raceaddr内部完成为:return unsafe.Pointer(&c.buf)
case elem.ptrdata == 0:
// 有缓存区,而且行列中不存在指针,分配接连地址空间,巨细为 hchanSize + mem
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// buf指针指向空hchan占用空间的末尾
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 行列包含指针类型
// 为buf独自开辟mem巨细的空间,用来保存一切的数据
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size) // 元素巨细
c.elemtype = elem // 元素类型
c.dataqsiz = uint(size) // chan 缓存区巨细
lockInit(&c.lock, lockRankHchan) // 初始化锁
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "n")
}
return c
}
(三)发送
ch <- 1
履行 runtime.chansend1(SB)
-
反常查看:
- 发送到 nil chan 中,堵塞挂起
- 往 closed chan 发送(写),则 panic
- 当时 chan 是否能够发送
- 同步发送:recvq 中存在等候接纳者,则直接唤醒并发送数据
- 异步发送:c.qcount < c.dataqsiz 缓存区空闲,则数据发送到缓存区
- 堵塞发送:当时面都不满意时 且 block = true 时:发送操作 线程堵塞 挂起,而且增加到 sendq 等候行列,直到有接纳者接纳才开释
/**
* @Description:
chansend函数首要能够概括为四部分:
反常查看、同步发送、异步发送、堵塞发送:
* @Param:c:hchan结构;ep:发送的元素;block:是否堵塞;callerpc:
* @return: true发送成功,false发送失败
*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ------------------------------------ 1.反常查看 ------------------------------------
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) // 发送到 nil chan 中,堵塞挂起
throw("unreachable")
}
..........
// 当channel不为nil,此刻查看channel是否做好接纳发送操作的准备,
if !block && c.closed == 0 && full(c) {
return false // 非堵塞且未封闭: 1. 无缓存区,recvq为空 2. 有缓冲区,可是buffer已满
}
lock(&c.lock) // 先上锁
if c.closed != 0 { // chan现已封闭,则解锁
unlock(&c.lock)
panic(plainError("send on closed channel")) // 往 closed chan 发送(写),则 panic
}
// ------------------------------------ 2.同步发送 ------------------------------------
// recvq 中存在等候接纳者,则直接唤醒并发送数据
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3) // recvq 等候行列取取出 sg(sudog)并唤醒并发送数据 ep
return true
}
// ------------------------------------ 3.异步发送 ------------------------------------
// (有缓存区,没有等候接纳者,先发到缓冲区中,等有接纳者再去读)
if c.qcount < c.dataqsiz { // 存在的元素个数< 缓冲区:说明缓存区能够持续写数据
qp := chanbuf(c, c.sendx) // 获取缓存区index地址
typedmemmove(c.elemtype, qp, ep) // 数据写入buffer
c.sendx++ // 发送数据的下标++
if c.sendx == c.dataqsiz { // 当发送数据的下标等于缓冲区,表数据发送结束,从头开始
c.sendx = 0
}
c.qcount++ // 元素数量++
unlock(&c.lock) // 解锁
return true // 回来结果
}
if !block {
unlock(&c.lock) // 解锁
return false
}
// ------------------------------------ 4. 堵塞发送 ------------------------------------
// 当时面都不满意时(没有等候接纳者,没有空闲缓冲区) 且 block = true 时,发送操作 线程堵塞 挂起,直到有接纳者接纳才开释:
gp := getg()
..........
c.sendq.enqueue(mysg) // 将发送 的 sg 增加到 sendq 等候行列中
return true
}
func full(c *hchan) bool {
if c.dataqsiz == 0 { // 无缓冲
return c.recvq.first == nil
}
// 有缓冲,现有元素的个数 是否等于 缓冲区容量时(缓冲区满)
return c.qcount == c.dataqsiz
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
......
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep) // 将数据复制到接纳变量的内存地址上
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1) // 唤醒sudog协程;下一轮调度时会唤醒这个接纳的 goroutine。
}
// 完成了等候行列的入队操作。它将一个元素增加到等候行列的末尾,并更新行列的 first 和 last 指针
func (q *waitq) enqueue(sgp *sudog) {
sgp.next = nil // 表明该元素是行列的最终一个
x := q.last // 将等候行列 q 中的最终一个元素(假如存在)赋值给变量 x。
if x == nil { // 假如行列中最终一个都没有,则行列无元素,即 x 为 nil,
sgp.prev = nil // 则将 sgp 元素的 prev 指针设为 nil,表明该元素是行列中的第一个元素,
q.first = sgp // 然后将行列的 first 和 last 指针都指向该元素,表明该元素是行列中唯一的元素。
q.last = sgp // 然后直接回来,结束入队操作。
return
}
sgp.prev = x // sgp 是新加的最终元素,需求关联前一个元素(x为原行列中最终一个元素)
x.next = sgp // 设置x的的下一个元素为新加的元素
q.last = sgp // 设置q行列的最终一个元素
}
(四)接纳
i <- ch i, ok <- ch
履行 runtime.chanrecv1(SB) 都是调用的chanrecv()
-
反常查看:
- 从 nil chan 中读取,堵塞挂起
- 从 closed chan 接纳(读),回来零值
- 当时 chan 是否能够接纳
- 同步接纳:sendq 中存在发送者,则直接唤醒并接纳数据
- 异步接纳:c.qcount 行列中有元素,则则从 buf 中读取数据
- 堵塞接纳:当时面都不满意时 且 block = true 时:接纳操作 线程堵塞 挂起,而且增加到 recvq 等候行列,直到有发送者才开释
chanrecv 函数的逻辑和 chansend 的逻辑基本共同
(五)封闭
close(ch)
closechan(c *hchan)
首要逻辑:
-
反常查看:
- 封闭 nil chan ,panic
- 封闭 closed chan,panic
- 标记 chan 为封闭状态
- 开释等候的 sudog: 唤醒并调度等候行列 recvq、sendq 中的 sudog,一切接纳者收到零值
func closechan(c *hchan) {
// ------------------------------------ 1. 反常查看 ------------------------------------
if c == nil {
panic(plainError("close of nil channel")) // 封闭 nil chan ,panic
}
lock(&c.lock) // 上锁
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel")) // 封闭 closed chan,panic
}
c.closed = 1 // 标识chan现已封闭
// ------------------------------------ 2. 开释等候的 sudog ------------------------------------
var glist gList // 存储 recvq、sendq 等候行列中的 sg(sudog)
for {
sg := c.recvq.dequeue() // 将 recvq 等候行列中的 sg(sudog) 增加到 glist
......
glist.push(gp)
}
for {
sg := c.sendq.dequeue() // 将 sendq 等候行列中的 sg(sudog) 增加到 glist
......
glist.push(gp)
}
unlock(&c.lock) // 解锁
for !glist.empty() { //顺次从 glist 中弹出 sg(sudog)并唤醒履行,一切接纳者收到零值
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
三、常见问题
1.为什么要运用环形行列
chan的内部运用环形行列来存取元素,每次发/收元素时,会依据sendx/recvx记载的方位从行列buf中存取元素,
所以环形行列:buf+sendx+recvx完成的,
运用环形数组完成的好处:
-
防止对数组进行复制或许移动操作
比如数组【1,2,3】,现在增加4,变为【2,3,4】,数组就需求进行复制复制操作,假如是环形行列,则直接将4增加到行列的尾部即可,
-
防止内存分配和复制的开销,从而提高程序的性能
重复使用,防止重新分配内存
2. 关于chan的操作