本文正在参加「金石计划 . 瓜分6万现金大奖」
前言
channel是golang中标志性的概念之一,很好很强大!
channel(通道),顾名思义,是一种通道,一种用于并发环境中数据传递的通道。通常结合golang中另一重要概念goroutine(go协程)运用,使得在golang中的并发编程变得清晰简洁一起又高效强大。
今天测验着读读golang对channel的完成源码,本文主要是自己个人对于Channel源码的学习笔记,需求的朋友能够参阅以下内容,希望对咱们有帮助。
channel基础结构
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx: uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
hchan结构就是channel的底层数据结构,看源码界说,能够说是十分清晰了。
- qcount:channel缓存行列中已有的元素数量
- dataqsiz:channel的缓存行列巨细(界说channel时指定的缓存巨细,这儿channel用的是一个环形行列)
- buf:指向channel缓存行列的指针
- elemsize:经过channel传递的元素巨细
- closed:channel是否关闭的标志
- elemtype:经过channel传递的元素类型
- sendx:channel中发送元素在行列中的索引
- recvx:channel中接受元素在行列中的索引
- recvq:等候从channel中接纳元素的协程列表
- sendq:等候向channel中发送元素的协程列表
- lock:channel上的锁
其间关于recvq和sendq的两个列表所用的结构waitq简略看下。
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
g *g
selectdone *uint32 // CAS to 1 to win select race (may point to stack)
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
...
c *hchan // channel
}
能够看出waiq是一个双向链表结构,链上的节点是sudog。从sudog的结构界说能够大略看出,sudog是对g(即协程)的一个封装。用于记载一个等候在某个channel上的协程g、等候的元素elem等信息。
channel初始化
func makechan(t *chantype, size int64) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {
panic(plainError("makechan: size out of range"))
}
var c *hchan
if elem.kind&kindNoPointers != 0 || size == 0 {
// Allocate memory in one call.
// Hchan does not contain pointers interesting for GC in this case:
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
// race detector uses this location for synchronization
// Also prevents us from pointing beyond the allocation (see issue 9401).
c.buf = unsafe.Pointer(c)
}
} else {
c = new(hchan)
c.buf = newarray(elem, int(size))
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
第一部分的3个if是对初始化参数的合法性查看。
if elem.size >= 1<<16:
查看channel元素巨细,小于2字节
if hchanSize%maxAlign != 0 || elem.align > maxAlign
没看懂(对齐?)
if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size)
第一个判别缓存巨细需求大于等于0
int64(uintptr(size)) != size这一句实践是用于判别size是否为负数。因为uintptr实践是一个无符号整形,负数经过转化后会变成一个与原数完全不同的很大的正整数,而正数经过转化后并没有变化。
最后一句判别channel的缓存巨细要小于heap中能分配的巨细。_MaxMem是可分配的堆巨细。
第二部分是具体的内存分配。
元素类型为kindNoPointers的时分,既非指针类型,则直接分配(hchanSize+uintptr(size)*elem.size)巨细的连续空间。c.buf指向hchan后面的elem行列首地址。
假如channel缓存巨细为0,则c.buf实践上是没有给他分配空间的
假如类型为非kindNoPointers,则channel的空间和buf的空间是分别分配的。
channel发送
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc(unsafe.Pointer(&c)))
}
channel发送,即协程向channel中发送数据,与此操作对应的go代码如c <- x。
channel发送的完成源码中,经过chansend1(),调用chansend(),其间block参数为true。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
...
}
chansend()首先对c进行判别, if c == nil:即channel没有被初始化,这个时分会直接调用gopark使得当时协程进入等候状况。而且用于唤醒的参数unlockf传的nil,即没有人来唤醒它,这样系统进入死锁。所以channel必须被初始化之后才能运用,不然死锁。
接下来是正式的发送处理,且后续操作会加锁。
lock(&c.lock)
close判别
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
假如channel已经是closed状况,解锁然后直接panic。也就是说咱们不能够向已经关闭的通道内在发送数据。
将数据发给接纳协程
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
测验从接纳等候协程行列中取出一个协程,假如有则直接数据发给它。也就是说发送到channel的数据会优先查看接纳等候行列,假如有协程等候取数,就直接给它。发完解锁,操作完成。
这儿send()办法会将数据写到从行列里取出来的sg中,经过goready()唤醒sg.g(即等候的协程),进行后续处理。
数据放到缓存
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
假如没有接纳协程在等候,则去查看channel的缓存行列是否还有空位。假如有空位,则将数据放到缓存行列中。
经过c.sendx游标找到行列中的空余方位,然后将数据存进去。移动游标,更新数据,然后解锁,操作完成。
if c.sendx == c.dataqsiz {
c.sendx = 0
}
经过这一段游标的处理能够看出,缓存行列是一个环形。
堵塞发送协程
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
假如缓存也慢了,这时分就只能堵塞住发送协程了, 等有适宜的机会了,再将数据发送出去。
getg()获取当时协程目标g的指针,acquireSudog()生成一个sudog,然后将当时协程及相关数据封装好链接到sendq列表中。然年经过goparkunlock()将其转为等候状况,并解锁。操作完成。
channel接纳
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
channel接纳,即协程从channel中接纳数据,与此操作对应的go代码如<- c。
channel接纳的完成源码中,经过chanrecv1(),调用chanrecv(),其间block参数为true。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c == nil {
if !block {
return
}
gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
...
}
同发送相同,接纳也会首先查看c是否为nil,假如为nil,会调用gopark()休眠当时协程,从而终究形成死锁。
接纳操作同样先进行加锁,然后开始正式操作。
close处理
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
接纳和发送略有不同,当channel关闭而且channel的缓存行列里没有数据了,那么接纳动作会直接完毕,但不会报错。
也就是说,允许从已关闭的channel中接纳数据。
从发送等候协程中接纳
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
测验从发送等候协程列表中取出一个等候协程,假如存在,则调用recv()办法接纳数据。
这儿的recv()办法比send()办法稍微复杂一点,咱们简略分析下。
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
...
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
...
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
recv()的接纳动作分为两种状况:
- c.dataqsiz == 0:即当channel为无缓存channel时,直接将发送协程中的数据,复制给接纳者。
- c.dataqsiz != 0:假如channel有缓存,则:依据缓存的接纳游标,从缓存行列中取出一个,复制给接受者
小结
channel必须初始化后才能运用;
channel关闭后,不允许在发送数据,可是还能够持续从中接纳未处理完的数据。所以尽量从发送端关闭channel;
无缓存的channel需求注意在一个协程中的操作不会形成死锁;