从鹅厂实例出发!分析Go Channel底层原理

本文是根据Go1.18.1源码的学习笔记。Channel的底层源码从Go1.14到现在的Go1.19之间几乎没有改变,这也是Go最早引入的组件之一,表现了Go并发思维:

Do not communicate by sharing memory; instead, share memory by communicating.

不要经过同享内存来通讯,⽽应经过通讯来同享内存。

从鹅厂实例出发!分析Go Channel底层原理

定论

仍是先给出定论,没时间看剖析进程的同学至少能够看一眼定论:

1. Channel实质上是由三个FIFO(First In FirstOut,先进先出)行列组成的用于协程之间传输数据的协程安全的通道;FIFO的规划是为了保障公平,让事情变得简略,原则是让等候时间最长的协程最有资格先从channel发送或接纳数据;

2. 三个FIFO行列依次是buf循环行列,sendq待发送者行列,recvq待接纳者行列。buf循环行列是巨细固定的用来寄存channel接纳的数据的行列;sendq待发送者行列,用来寄存等候发送数据到channel的goroutine的双向链表,recvq待接纳者行列,用来寄存等候从channel读取数据的goroutine的双向链表;sendq和recvq能够认为不限巨细;

3. 跟函数调用传参实质都是传值相同,channel传递数据的实质便是值复制,引证类型数据的传递也是地址复制;有从缓冲区buf地址复制数据到接纳者receiver栈内存地址,也有从发送者sender栈内存地址复制数据到缓冲区buf;

4. Channel里边参数的修改不是并发安全的,包括对三个行列及其他参数的拜访,因而需求加锁,实质上,channel便是一个有锁行列;

5. Channel 的功能跟 sync.Mutex 差不多,没有谁比谁强。Go官方之所以引荐运用Channel进行并发协程的数据交互,是由于channel的规划理念能让程序变得简略,在大型程序、高并发杂乱的运转状况中也是如此。

从鹅厂实例出发!分析Go Channel底层原理

从一个线上的内存泄漏问题谈起

去年底,团队有个线上服务产生了一个故障,该服务部署在K8S集群的容器里,经过Prometheus监控界面看到本服务的Pod的内存运用量呈锯齿状增加,到达服务设置的内存上限16G后,就会产生容器重启,看现象是产生了内存泄漏。

从鹅厂实例出发!分析Go Channel底层原理

线上服务的代码经过简化,根本逻辑如下:

package main
import (
  "errors"
  "fmt"
)
func accessMultiService() (data string, err error) {
  respAChan := make(chan string)           //无缓冲channel
  go func() {
    serviceAResp, _ := accessServiceA()
    respAChan <- serviceAResp
  }()
  _, serviceBErr := accessServiceB()
  if serviceBErr != nil {
    return "", errors.New("service B response error")
  }
  _, serviceCErr := accessServiceC()
  if serviceCErr != nil {
    return "", errors.New("service C response error")
  }
  respA := <- respAChan
  fmt.Printf("service A resp is: %s\n", respA)
  return  "success", nil
}
func accessServiceA() (string, error) {
  return "service A result", nil
}
func accessServiceB() (string, error) {
  return "service B result", errors.New("service B error")
}
func accessServiceC() (string, error) {
  return "service C result", nil
}

经过排查,是在起的一个goroutine拜访 A 服务时,运用了一个无缓冲的channel respAChan,在后续的拜访服务B,C时,产生了反常导致父协程回来,A服务的子协程里的无缓冲channel respAChan一向没有goroutine去读它,导致它一向被堵塞,无法被开释,随着请求数的增多,它地点的goroutine会一向占用内存,直到到达容器内存上限,使容器崩溃重启。

解决办法能够是:将无缓冲的channel改成有缓冲channel,而且在写入数据后封闭它,这样就不会产生goroutine一向堵塞,无法被开释的问题了。

        respAChan := make(chan string, 1)           //改为有缓冲channel
  go func() {
    serviceAResp, _ := accessServiceA()
    respAChan <- serviceAResp
    close(respAChan)                   //写入后封闭channel
  }()

从这个问题能够知道虽然我们都用过channel,却也简单因运用不当而导致线上故障。

Channel是什么?怎样用?

首要是channel分为两类:

1.无缓冲channel,能够看作“同步形式”,发送方和接纳方要同步就绪,只要在两者都 ready 的状况下,数据才能在两者间传输(后面会看到,实际上便是内存复制)。不然,恣意一方先行进行发送或接纳操作,都会被挂起,等候另一方的呈现才能被唤醒。

2.有缓冲channel称为“异步形式”,在缓冲槽可用的状况下(有剩余容量),发送和接纳操作都能够顺利进行。不然,操作的一方(如写入)同样会被挂起,直到呈现相反操作 (如接纳)才会被唤醒。

channel的根本用法有:

1.读取 <- chan

2.写入 chan <-

3.封闭 close(chan)

4.获取channel长度 len(chan)

5.获取channel容量 cap(chan)

还有一种select非堵塞拜访办法,从一切的case中选择一个不会堵塞的channel进行读写操作,或是default履行。

从鹅厂实例出发!分析Go Channel底层原理

Channel规划思维

Go言语的并发模型是CSP(Communicating Sequential Processes,通讯次序进程),提倡经过通讯同享内存而不是经过同享内存而完结通讯。

假如说goroutine是Go程序并发的履行体,channel便是它们之间的衔接。channel是能够让一个goroutine发送特定值到另一个goroutine的通讯机制。

下面有关并发评论中的线程能够替换为进程、协程或函数,实质上都是一起对同一份数据的竞赛。

先弄清楚并发和并行的区别:多线程程序在一个核的CPU上运转,便是并发。多线程程序在多个核的CPU上运转,便是并行。

单纯地将线程并发履行是没有含义的。线程与线程间需求交换数据才能表现并发履行线程的含义。

多个线程之间交换数据无非是两种办法:同享内存加互斥锁;先进先出(FIFO)将资源分配给等候时间最长的线程。

同享内存加互斥锁是C++等其他言语选用的并发线程交换数据的办法,在高并发的场景下有时分难以正确的运用,特别是在超大型、巨型的程序中,简单带来难以察觉的隐藏的问题。Go言语选用的是后者,引入channel以先进先出(FIFO)将资源分配给等候时间最长的goroutine,尽量消除数据竞赛,让程序以尽可能次序一致的办法运转。

关于理解让程序尽量次序一致的含义,能够看看Go言语内存模型选用的一个传统的根据happens-before对读写竞赛的定义:

1.修改由多个goroutines一起拜访的数据的程序必须串行化这些拜访。

2.为了完结串行拜访, 需求运用channel操作或其他同步原语(如sync和sync/atomic包中的原语)来维护数据。

3.go语句创立一个goroutine,必定产生在goroutine履行之前。

4.往一个channel中发送数据,必定产生在从这个channel 读取这个数据完结之前。

5.一个channel的封闭,必定产生在从这个channel读取到零值数据(这儿指由于close而回来的零值数据)之前。

6.从一个无缓冲channel的读取数据,必定产生在往这个channel发送数据完结之前。

假如违反了这种定义,Go会让程序直接panic或堵塞,无法往后履行。

有人说,Go没有选用同享内存加互斥锁进行协程之间的通讯,是由于这种办法功能太差,其实不是,由于channel实质也是一个有锁的行列,选用channel进行协程之间的通讯,首要是为了减少数据竞赛,在大型程序、高并发的杂乱场景下,以简略的原理完结的组件,更能让程序尽量按契合预期的、不易出错的办法履行。

Go 中用于并发协程同步数据的组件首要分为 2 大类,一个是 sync 和sync/atomic包里边的,如sync.Mutex、sync.RWMutex、sync.WaitGroup等,另一个是 channel。只要channel才是Go言语引荐的并发同步的办法,是一等公民,用户运用channel乃至不需求引入包名。

从鹅厂实例出发!分析Go Channel底层原理

Channel结构

channel的底层数据结构是hchan,在src/runtime/chan.go 中。

type hchan struct {
  qcount   uint                 // 行列中一切数据总数
  dataqsiz uint                 // 循环行列巨细
  buf      unsafe.Pointer       // 指向循环行列的指针
  elemsize uint16               // 循环行列中元素的巨细
  closed   uint32               // chan是否封闭的标识
  elemtype *_type               // 循环行列中元素的类型
  sendx    uint                 // 已发送元素在循环行列中的方位
  recvx    uint                 // 已接纳元素在循环行列中的方位
  recvq    waitq                // 等候接纳的goroutine的等候行列
  sendq    waitq                // 等候发送的goroutine的等候行列
  lock mutex                    // 控制chan并发拜访的互斥锁
}

qcount代表chan中现已接纳但还没被读取的元素的个数;

dataqsiz代表循环行列的巨细;

buf 是指向循环行列的指针,循环行列是巨细固定的用来寄存chan接纳的数据的行列;

elemtype 和 elemsiz 表明循环行列中元素的类型和元素的巨细;

sendx:待发送的数据在循环行列buffer中的方位索引;

recvx:待接纳的数据在循环行列buffer中的方位索引;

recvq 和 sendq 别离表明等候接纳数据的 goroutine 与等候发送数据的 goroutine;

从鹅厂实例出发!分析Go Channel底层原理

sendq 和 recvq 存储了当时 Channel 由于缓冲区空间不足而堵塞的 Goroutine 列表,这些等候行列运用双向链表 waitq 表明,链表中一切的元素都是 sudog 结构:

type waitq struct {
  first *sudog
  last  *sudog
}

sudog代表着等候行列中的一个goroutine,G与同步目标(指chan)关系是多对多的。一个 G 能够呈现在许多等候行列上,因而一个 G 可能有多个sudog。而且多个 G 可能正在等候同一个同步目标,因而一个目标可能有许多 sudog。sudog 是从特别池中分配出来的。运用 acquireSudog 和 releaseSudog 分配和开释它们。

从鹅厂实例出发!分析Go Channel底层原理

从鹅厂实例出发!分析Go Channel底层原理

创立Chan

Channel的创立会运用make关键字:

ch := make(chan int, 10)

编译器编译上述代码,在查看ir节点时,根据节点op不同类型,进行不同的查看,源码如下:

func walkExpr1(n ir.Node, init *ir.Nodes) ir.Node {
  switch n.Op() {
  ......
  case ir.OMAKECHAN:
    n := n.(*ir.MakeExpr)
    return walkMakeChan(n, init)
  ......
}

编译器会将 make(chan int, 10) 表达式转换成 OMAKE 类型的节点,并在类型查看阶段将 OMAKE 类型的节点转换成 OMAKECHAN 类型,该类型节点会调用walkMakeChan函数处理:

func walkMakeChan(n *ir.MakeExpr, init *ir.Nodes) ir.Node {
  size := n.Len
  fnname := "makechan64"
  argtype := types.Types[types.TINT64]
  if size.Type().IsKind(types.TIDEAL) || size.Type().Size() <= types.Types[types.TUINT].Size() {
    fnname = "makechan"
    argtype = types.Types[types.TINT]
  }
  return mkcall1(chanfn(fnname, 1, n.Type()), n.Type(), init, reflectdata.TypePtr(n.Type()), typecheck.Conv(size, argtype))
}

上述代码默认调用makechan64()函数。假如在make函数中传入的 channel size 巨细在 int 范围内,引荐运用 makechan()。由于 makechan() 在 32 位的平台上更快,用的内存更少。

makechan64() 办法在src/runtime/chan.go,只是判别一下传入的入参 size 是否还在 int 范围之内:

func makechan64(t *chantype, size int64) *hchan {
  if int64(int(size)) != size {
    panic(plainError("makechan: size out of range"))
  }
  return makechan(t, int(size))
}

终究创立 channel 调用的仍是runtime.makechan() 函数:

func makechan(t *chantype, size int) *hchan {
  elem := t.elem
  // 查看数据项巨细不能超过 64KB
  if elem.size >= 1<<16 {
    throw("makechan: invalid channel element type")
  }
        // 查看内存对齐是否正确
  if hchanSize%maxAlign != 0 || elem.align > maxAlign {
    throw("makechan: bad alignment")
  }
        // 缓冲区巨细查看,判别是否溢出
  mem, overflow := math.MulUintptr(elem.size, uintptr(size))
  if overflow || mem > maxAlloc-hchanSize || size < 0 {
    panic(plainError("makechan: size out of range"))
  }
  var c *hchan
  switch {
  case mem == 0:
    // 行列或许元素巨细为 zero 时,无须创立buf环形行列.
    c = (*hchan)(mallocgc(hchanSize, nil, true))
    // 竞态查看,利用这个地址进行同步操作.
    c.buf = c.raceaddr()
  case elem.ptrdata == 0:
    // 元素不是指针,分配一块接连的内存给hchan数据结构和缓冲区buf
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
                // 表明hchan后面在内存里紧跟着便是buf环形行列
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
    // 元素包括指针,单独分配环形行列buf
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }
        // 设置元素个数、元素类型给创立的chan
  c.elemsize = uint16(elem.size)
  c.elemtype = elem
  c.dataqsiz = uint(size)
  lockInit(&c.lock, lockRankHchan)
  if debugChan {
    print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
  }
  return c
}

上面这段 makechan() 代码首要意图是生成 *hchan 目标。要点关注 switch-case 中的 3 种状况:

1.当行列或许元素巨细为 0 时,调用 mallocgc() 在堆上为 channel 拓荒一段巨细为 hchanSize 的内存空间;

2.当元素类型不是指针类型时,调用 mallocgc() 在堆上为 channel 和底层 buf 缓冲区数组拓荒一段巨细为 hchanSize + mem 接连的内存空间;

3.默认状况元素类型中有指针类型,调用 mallocgc() 在堆上别离为 channel 和 buf 缓冲区分配内存。

这儿需求解说下:当存储在 buf 中的元素不包括指针时,hchan 中也不包括 GC 关怀的指针。buf 指向一段相同元素类型的内存,elemtype 固定不变。遭到废物收回器的限制,指针类型的缓冲 buf 需求单独分配内存。

channel本身是引证类型,其创立全部调用的是 mallocgc(),在堆上拓荒的内存空间,阐明 channel 本身会被 GC 主动收回。

从鹅厂实例出发!分析Go Channel底层原理

发送数据

向 channel 中发送数据运用 ch <- 1 代码,编译器在编译它时,会把它解析成OSEND节点:

func walkExpr1(n ir.Node, init *ir.Nodes) ir.Node {
  switch n.Op() {
  ......
  case ir.OSEND:
    n := n.(*ir.SendStmt)
    return walkSend(n, init)
  ......
}

对OSEND节点会调用 walkSend()函数处理:

func walkSend(n *ir.SendStmt, init *ir.Nodes) ir.Node {
  n1 := n.Value
  n1 = typecheck.AssignConv(n1, n.Chan.Type().Elem(), "chan send")
  n1 = walkExpr(n1, init)
  n1 = typecheck.NodAddr(n1)
  return mkcall1(chanfn("chansend1", 2, n.Chan.Type()), nil, init, n.Chan, n1)
}

运转时的chansend1()函数实际调用的是chansend():

func chansend1(c *hchan, elem unsafe.Pointer) {
  chansend(c, elem, true, getcallerpc())
}

chansend()函数的首要逻辑是:

1.在chan为 nil 未初始化的状况下,关于select这种非堵塞的发送,直接回来 false;关于堵塞的发送,将 goroutine 挂起,而且永久不会回来。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        // 假如chan为nil
  if c == nil {
                // 关于select这种非堵塞的发送,直接回来
    if !block {
      return false
    }
                // 关于堵塞的通道,将 goroutine 挂起
    gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
    throw("unreachable")
  }
        ......
}

2.非堵塞发送的状况下,当 channel 不为 nil,而且 channel 没有封闭时,假如没有缓冲区且没有接纳者receiver,或许缓冲区现已满了,回来 false。

        if !block && c.closed == 0 && full(c) {
    return false
  }

full() 办法作用是判别在 channel 上发送是否会堵塞,用来判别的参数是qcount,c.recvq.first,dataqsiz,前两个变量都是单字长的,所以对它们单个值的读操作是原子性的。dataqsiz字段,它在创立完 channel 今后是不可变的,因而它能够安全的在恣意时刻读取。

func full(c *hchan) bool {
  // 假如循环行列巨细为0
  if c.dataqsiz == 0 {
    // 假定指针读取是近似原子性的,这儿用来判别没有接纳者
    return c.recvq.first == nil
  }
  // 行列满了
  return c.qcount == c.dataqsiz
}

3.接下来,对chan加锁,判别chan不是封闭状况,再从recvq行列中取出一个接纳者,假如接纳者存在,则直接向它发送消息,绕过循环行列buf,此刻,由于有接纳者存在,则循环行列buf必定是空的。

        ......
        // 对chan加锁
        lock(&c.lock)
        // 查看chan是否封闭
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("send on closed channel"))
  }
        // 从 recvq 中取出一个接纳者
  if sg := c.recvq.dequeue(); sg != nil {
    // 假如接纳者存在,直接向该接纳者发送数据,绕过循环行列buf
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
  }
        ......

send() 函数首要完结了 2 件事:调用 sendDirect() 函数将数据复制到了接纳者的内存地址上;调用 goready() 将等候接纳的堵塞 goroutine 的状况从 Gwaiting 或许 Gscanwaiting 改变成 Grunnable。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  ......
  if sg.elem != nil {
                // 直接把要发送的数据复制到receiver的内存地址
    sendDirect(c.elemtype, sg, ep)
    sg.elem = nil
  }
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  sg.success = true
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
        // 唤醒等候的接纳者goroutine
  goready(gp, skip+1)
}

4.回到chansend()办法,接下来是有缓冲区的异步发送的逻辑。

// 假如缓冲区没有满,直接将要发送的数据复制到缓冲区
        if c.qcount < c.dataqsiz {
    // 找到要发送数据到循环行列buf的索引方位
    qp := chanbuf(c, c.sendx)
    ......
                // 数据复制到循环行列中
    typedmemmove(c.elemtype, qp, ep)
                // 将待发送数据索引加1,由于是循环行列,假如到了结尾,从0开始
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
                // chan中元素个数加1,开释锁回来true
    c.qcount++
    unlock(&c.lock)
    return true
  }

假如缓冲区buf还没有满,调用 chanbuf() 获取 sendx 索引的元素指针值。调用 typedmemmove() 办法将发送的值复制到缓冲区 buf 中。复制完结,增加 sendx 索引下标值和 qcount 个数。

5.假如履行前面的进程还没有成功发送,就表明缓冲区没有空间了,而且也没有任何接纳者在等候。后面需求将 goroutine 挂起然后等候新的接纳者了。

   // 缓冲区没有空间,关于select这种非堵塞调用直接回来false
        if !block {
    unlock(&c.lock)
    return false
  }
  // 下面的逻辑是将当时goroutine挂起
        // 调用 getg()办法获取当时goroutine的指针,用于绑定给一个 sudog
  gp := getg()
        // 调用 acquireSudog()办法获取一个 sudog,可能是新建的 sudog,也有可能是从缓存中获取的。设置好sudog要发送的数据和状况。比如发送的 Channel、是否在 select 中和待发送数据的内存地址等等。
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {
    mysg.releasetime = -1
  }
  mysg.elem = ep
  mysg.waitlink = nil
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.waiting = mysg
  gp.param = nil
        // 调用 c.sendq.enqueue 办法将装备好的 sudog 参加待发送的等候行列
  c.sendq.enqueue(mysg)
  atomic.Store8(&gp.parkingOnChan, 1)
        // 调用gopark办法挂起当时goroutine,状况为waitReasonChanSend,堵塞等候channel接纳者的激活
  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
  // 终究,KeepAlive() 保证发送的值保持活动状况,直到接纳者将其复制出来
  KeepAlive(ep)

6.chansend()办法终究的代码是当goroutine唤醒今后,免除堵塞的状况。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        ......
  if mysg != gp.waiting {
    throw("G waiting list is corrupted")
  }
  gp.waiting = nil
  gp.activeStackChans = false
  closed := !mysg.success
  gp.param = nil
  if mysg.releasetime > 0 {
    blockevent(mysg.releasetime-t0, 2)
  }
  mysg.c = nil
  releaseSudog(mysg)
  if closed {
    if c.closed == 0 {
      throw("chansend: spurious wakeup")
    }
    panic(plainError("send on closed channel"))
  }
  return true
}

综上所述:

1.首要select这种非堵塞的发送,判别两种状况;

2.然后是一般的堵塞调用,先判别recvq等候接纳行列是否为空,不为空阐明缓冲区中没有内容或许是一个无缓冲channel;

3.假如recvq有接纳者,则缓冲区必定为空,直接从recvq中取出一个goroutine,然后写入数据,接着唤醒 goroutine,完毕发送进程;

4.假如缓冲区有空余的方位,写入数据到缓冲区,完结发送;

5.假如缓冲区满了,那么就把发送数据的goroutine放到sendq中,进入睡觉,等候该goroutine被唤醒。

从鹅厂实例出发!分析Go Channel底层原理

接纳数据

从channel中接纳数据的代码是:

i <- ch
i, ok <- ch

经过编译器的处理,会解析成ORECV节点,后者会在类型查看阶段被转换成 OAS2RECV 类型。终究,这两种不同的 channel 接纳办法会转换成 runtime.chanrecv1 和 runtime.chanrecv2 两种不同函数的调用,可是终究中心逻辑仍是在 runtime.chanrecv 中。

下面直接看chanrecv()办法的逻辑:

1.chanrecv()办法有两个回来值,selected, received bool,前者表明是否接纳到值,后者表明接纳的值是否封闭后发送的。有三种状况:假如是非堵塞的状况,没有数据能够接纳,则回来 (false,flase);假如 chan 现已封闭了,将 ep 指向的值置为 0值,而且回来 (true, false);其它状况回来值为 (true,true),表明成功从 chan 中获取到了数据,且是chan未封闭发送。

// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
         ......
}

2.首要判别假如chan为空,且是select这种非堵塞调用,那么直接回来 (false,false),不然堵塞当时的goroutine。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ......
        // 假如c为空
  if c == nil {
                // 假如c为空且是非堵塞调用,那么直接回来 (false,false)
    if !block {
      return
    }
                //堵塞当时的goroutine
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
    throw("unreachable")
  }
        ......
}

3.假如是非堵塞调用,经过empty()办法原子判别是无缓冲chan或许是chan中没有数据且chan没有封闭,则回来(false,false),假如chan封闭,为了防止查看期间的状况改变,二次调用empty()进行原子查看,假如是无缓冲chan或许是chan中没有数据,回来 (true, false),这儿的第一个true表明chan封闭后读取的 0 值。

        //非堵塞调用,经过empty()判别是无缓冲chan或许是chan中没有数据
  if !block && empty(c) {
    // 假如chan没有封闭,则直接回来 (false, false)
    if atomic.Load(&c.closed) == 0 {
      return
    }
                // 假如chan封闭, 为了防止查看期间的状况改变,二次调用empty()进行原子查看,假如是无缓冲chan或许是chan中没有数据,回来 (true, false)
    if empty(c) {
      if raceenabled {
        raceacquire(c.raceaddr())
      }
      if ep != nil {
        typedmemclr(c.elemtype, ep)
      }
      return true, false
    }
  }
func empty(c *hchan) bool {
  // c.dataqsiz 是不可变的
  if c.dataqsiz == 0 {
    return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
  }
  return atomic.Loaduint(&c.qcount) == 0
}

4.接下来堵塞调用的逻辑,chanrecv办法对chan加锁,判别chan假如现已封闭,而且chan中没有数据,回来 (true,false),这儿的第一个true表明chan封闭后读取的 0 值。

        ......
        // 对chan加锁
        lock(&c.lock)
        // 假如现已封闭,而且chan中没有数据,回来 (true,false)
  if c.closed != 0 && c.qcount == 0 {
    if raceenabled {
      raceacquire(c.raceaddr())
    }
    unlock(&c.lock)
    if ep != nil {
      typedmemclr(c.elemtype, ep)
    }
    return true, false
  }
        ......

5.接下来,从发送行列中获取一个等候发送的 goroutine,即取出等候行列队头的 goroutine。假如缓冲区的巨细为 0,则直接从发送方接纳值。不然,对应缓冲区满的状况,从行列的头部接纳数据,发送者的值添加到行列的结尾(此刻行列已满,因而两者都映射到缓冲区中的同一个下标)。这儿需求注意,由于有发送者在等候,所以假如有缓冲区,那么缓冲区必定是满的。

       ......
       // 从发送者行列获取等候发送的 goroutine  
       if sg := c.sendq.dequeue(); sg != nil {
    //在 channel 的发送行列中找到了等候发送的 goroutine,取出队头等候的 goroutine。假如缓冲区的巨细为 0,则直接从发送方接纳值。不然,对应缓冲区满的状况,从行列的头部接纳数据,发送者的值添加到行列的结尾(此刻行列已满,因而两者都映射到缓冲区中的同一个下标)
    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true, true
  }
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if c.dataqsiz == 0 {
    if raceenabled {
      racesync(c, sg)
    }
    if ep != nil {
      // 从发送者sender里边复制数据
      recvDirect(c.elemtype, sg, ep)
    }
  } else {
    // 行列是满的
    qp := chanbuf(c, c.recvx)
    if raceenabled {
      racenotify(c, c.recvx, nil)
      racenotify(c, c.recvx, sg)
    }
    // 从缓冲区复制数据给接纳者receiver
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    // 从发送者sender复制数据到缓冲区
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  }
  sg.elem = nil
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  sg.success = true
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
        // 唤醒发送者
  goready(gp, skip+1)
}

recv()办法先判别chan是否无缓冲,假如是,则直接从发送者sender那里复制数据,假如有缓存区,由于有发送者,此刻缓冲区的循环行列必定是满的,会先从缓冲区复制数据给接纳者receiver,然后将发送者的数据复制到缓冲区,满足FIFO。终究,唤醒发送者的goroutine。

6.接下来,是异步接纳逻辑,假如缓冲区有数据,直接从缓冲区接纳数据,行将缓冲区recvx指向的数据复制到ep接纳地址,而且将recvx加1。

   ......
         // 假如缓冲区有数据
         if c.qcount > 0 {
    // 直接从缓冲区接纳数据
    qp := chanbuf(c, c.recvx)
    if raceenabled {
      racenotify(c, c.recvx, nil)
    }
                // 接纳数据地址ep不为空,直接从缓冲区复制数据到ep
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
                // 待接纳索引加1
    c.recvx++
                // 循环行列,假如到了结尾,从0开始
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
                // 缓冲区数据减1
    c.qcount--
    unlock(&c.lock)
    return true, true
  }
        ......

7.然后,是缓冲区没有数据的状况;假如是select这种非堵塞读取的状况,直接回来(false, false),表明获取不到数据;不然,会获取sudog绑定当时接纳者goroutine,调用gopark()挂起当时接纳者goroutine,等候chan的其他发送者唤醒。

        ......
        // 假如是select非堵塞读取的状况,直接回来(false, false)
        if !block {
    unlock(&c.lock)
    return false, false
  }
  // 没有发送者,挂起当时goroutine
        // 获取当时 goroutine 的指针,用于绑定给一个 sudog
  gp := getg()
        // 调用 acquireSudog() 办法获取一个 sudog,可能是新建的 sudog,也有可能是从缓存中获取的。设置好 sudog 要发送的数据和状况
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {
    mysg.releasetime = -1
  }
  mysg.elem = ep
  mysg.waitlink = nil
  gp.waiting = mysg
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.param = nil
        // 将装备好的 sudog 参加待发送的等候行列
  c.recvq.enqueue(mysg)
  atomic.Store8(&gp.parkingOnChan, 1)
        // 挂起当时 goroutine
  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
        ......

8.终究,当时goroutine被唤醒,完结chan数据的接纳,之后进行参数查看,免除chan绑定,并开释sudog。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ......
        // 当时goroutine被唤醒,完结chan数据的接纳,之后进行参数查看,免除chan绑定,并开释sudog
  if mysg != gp.waiting {
    throw("G waiting list is corrupted")
  }
  gp.waiting = nil
  gp.activeStackChans = false
  if mysg.releasetime > 0 {
    blockevent(mysg.releasetime-t0, 2)
  }
  success := mysg.success
  gp.param = nil
  mysg.c = nil
  releaseSudog(mysg)
  return true, success
}

综上剖析,从chan接纳数据的流程如下:

1.也是先判别select这种非堵塞接纳的两种状况(block为false);然后是加锁进行堵塞调用的逻辑;

2.同步接纳:假如发送者行列sendq不为空,且没有缓存区,直接从sendq中取出一个goroutine,读取当时goroutine中的消息,唤醒goroutine, 完毕读取的进程;

3.同步接纳:假如发送者行列sendq不为空,阐明缓冲区现已满了,移动recvx指针的方位,取出一个数据,一起在sendq中取出一个goroutine,复制里边的数据到buf中,完毕当时读取;

4.异步接纳:假如发送者行列sendq为空,且缓冲区有数据,直接在缓冲区取出数据,完结本次读取;

5.堵塞接纳:假如发送者行列sendq为空,且缓冲区没有数据。将当时goroutine参加recvq,进入睡觉,等候被发送者goroutine唤醒。

从鹅厂实例出发!分析Go Channel底层原理

封闭Chan

封闭chan的代码是close(ch),编译器会将其转为调用 runtime.closechan() 办法。

func closechan(c *hchan) {
        // 假如chan为空,此刻封闭它会panic
  if c == nil {
    panic(plainError("close of nil channel"))
  }
        // 加锁
  lock(&c.lock)
        // 假如chan现已封闭了,再次封闭它会panic
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
  }
  if raceenabled {
    callerpc := getcallerpc()
    racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
    racerelease(c.raceaddr())
  }
        // 设置chan的closed状况为封闭
  c.closed = 1
        // 声明一个寄存一切接纳者和发送者goroutine的list
  var glist gList
  //获取recvq里的一切接纳者
  for {
    sg := c.recvq.dequeue()
    if sg == nil {
      break
    }
    if sg.elem != nil {
      typedmemclr(c.elemtype, sg.elem)
      sg.elem = nil
    }
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = unsafe.Pointer(sg)
    sg.success = false
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
                // 放入行列glist中
    glist.push(gp)
  }
  // 获取一切发送者
  for {
    sg := c.sendq.dequeue()
    if sg == nil {
      break
    }
    sg.elem = nil
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = unsafe.Pointer(sg)
    sg.success = false
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
                // 放入行列glist中
    glist.push(gp)
  }
  unlock(&c.lock)
  // 唤醒一切的glist中的goroutine 
  for !glist.empty() {
    gp := glist.pop()
    gp.schedlink = 0
    goready(gp, 3)
  }
}

封闭chan的进程是:

1.先查看反常状况,当 Channel 是一个 nil 空指针或许封闭一个现已封闭的 channel 时,Go 言语运转时都会直接 panic。

2.封闭的首要工作是开释一切的接纳者和发送者:将一切的接纳者 readers 的 sudog 等候行列(recvq)参加到待清除行列 glist 中。注意这儿是先收回接纳者,由于从一个封闭的 channel 中读数据,不会产生 panic,顶多读到一个默认零值。再收回发送者 senders,将发送者的等候行列 sendq 中的 sudog 放入待清除行列 glist 中。注意这儿可能会产生 panic,由于往一个封闭的 channel 中发送数据,会产生 panic。

从鹅厂实例出发!分析Go Channel底层原理

总结

Channel是根据有锁行列完结数据在不同协程之间传输的通道,数据传输的办法其实便是值传递,引证类型数据的传递是地址复制。

有别于经过同享内存加锁的办法在协程之间传输数据,经过channel传递数据,这些数据的一切权也能够在goroutine之间传输。当 goroutine 向 channel 发送值时,我们能够看到 goroutine 开释了一些值的一切权。当一个 goroutine 从一个 channel 接纳到一个值时,能够看到 goroutine 获得了一些值的一切权。

channel常见的读写反常状况如下表所示:

channel操作 chan为nil 封闭的chan 非空、未封闭的chan
读 <- chan 堵塞 里边的内容读完了,之后获取到的是类型的零值 堵塞或正常读取数据。缓冲型 channel 为空或非缓冲型 channel 没有等候发送者时会堵塞
写 chan <- 堵塞 panic 堵塞或正常写入数据。非缓冲型 channel 没有等候接纳者或缓冲型 channel buf 满时会被堵塞
封闭 close(chan) panic panic 封闭成功

腾讯工程师技术干货直达:

  1. 快收藏!最全GO言语完结规划形式【下】

  2. 如何成为优异工程师之软技能篇

  3. 如何更好地运用Kafka?

  4. 一文带你深入了解HTTP

从鹅厂实例出发!分析Go Channel底层原理

阅览原文