0%

Golang channel 源码深度剖析

channel 是 Golang 中一个非常重要的特性,也是 Golang CSP 并发模型的一个重要体现。简单来说就是,goroutine 之间可以通过 channel 进行通信。

channel 在 Golang 如此重要,在代码中使用频率非常高,以至于不得不好奇其内部实现。本文将基于 go 1.13 的源码,分析 channel 的内部实现原理。

channel 的基本使用

在正式分析 channel 的实现之前,我们先看下 channel 的最基本用法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main
import "fmt"

func main() {
c := make(chan int)

go func() {
c <- 1 // send to channel
}()

x := <-c // recv from channel

fmt.Println(x)
}

在以上代码中,我们通过 make(chan int) 来创建了一个类型为 int 的 channel。
在一个 goroutine 中使用 c <- 1 将数据发送到 channel 中。在主 goroutine 中通过 x := <- c 从 channel 中读取数据并赋值给 x。

以上代码对应了 channel 的两种基本操作:send 操作 c <- 1 和 recv 操作 x := <- c, 分别表示发送数据到 channel 和从 channel 中接收数据。

此外,channel 还分为有缓存 channel 和无缓存 channel。上述代码中,我们使用的是无缓冲的 channel。对于无缓冲的 channel,如果当前没有其他 goroutine 正在接收 channel 数据,则发送方会阻塞在发送语句处。

我们可以在 channel 初始化时指定缓冲区大小,例如,make(chan int, 2) 则指定缓冲区大小为 2。在缓冲区未满之前,发送方无阻塞地可以往 channel 发送数据,无需等待接收方准备好。而如果缓冲区已满,则发送方依然会阻塞。

channel 对应的底层实现函数

在探究 channel 源码之前,我们至少需要先找到 channel 在 Golang 的具体实现在哪。因为我们在使用 channel 时,用的是 <- 符号,并不能直接在 go 源码中找到其实现。但是 Golang 的编译器必然会将 <- 符号翻译成底层对应的实现。

我们可以使用 Go 自带的命令: go tool compile -N -l -S hello.go, 将代码翻译成对应的汇编指令。

或者,直接可以使用 Compiler Explorer 这个在线工具。对于上述示例代码可以直接在这个链接看其汇编结果: go.godbolt.org/z/3xw5Cj

channel 汇编指令

通过仔细查看以上示例代码对应的汇编指令,可以发现以下的对应关系:

  1. channel 的构造语句 make(chan int), 对应的是 runtime.makechan 函数
  2. 发送语句 c <- 1, 对应的是 runtime.chansend1 函数
  3. 接收语句 x := <- c, 对应的是 runtime.chanrecv1 函数

以上几个函数的实现都位于 go 源码中的 runtime/chan.go 代码文件中。我们接下来针对这几个函数,探究下 channel 的实现。

channel 的构造

channel 的构造语句 make(chan int),将会被 golang 编译器翻译为 runtime.makechan 函数, 其函数签名如下:

1
func makechan(t *chantype, size int) *hchan

其中,t *chantype 即构造 channel 时传入的元素类型。size int 即用户指定的 channel 缓冲区大小,不指定则为 0。该函数的返回值是 *hchan。hchan 则是 channel 在 golang 中的内部实现。其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type hchan struct {
qcount uint // buffer 中已放入的元素个数
dataqsiz uint // 用户构造 channel 时指定的 buf 大小
buf unsafe.Pointer // buffer
elemsize uint16 // buffer 中每个元素的大小
closed uint32 // channel 是否关闭,== 0 代表未 closed
elemtype *_type // channel 元素的类型信息
sendx uint // buffer 中已发送的索引位置 send index
recvx uint // buffer 中已接收的索引位置 receive index
recvq waitq // 等待接收的 goroutine list of recv waiters
sendq waitq // 等待发送的 goroutine list of send waiters

lock mutex
}

hchan 中的所有属性大致可以分为三类:

  1. buffer 相关的属性。例如 buf、dataqsiz、qcount 等。 当 channel 的缓冲区大小不为 0 时,buffer 中存放了待接收的数据。使用 ring buffer 实现。
  2. waitq 相关的属性,可以理解为是一个 FIFO 的标准队列。其中 recvq 中是正在等待接收数据的 goroutine,sendq 中是等待发送数据的 goroutine。waitq 使用双向链表实现。
  3. 其他属性,例如 lock、elemtype、closed 等。

makechan 的整个过程基本都是一些合法性检测和对 bufferhchan 等属性的内存分配,此处不再进行深入讨论了,有兴趣的可以直接看此处的源码。

通过简单分析 hchan 的属性,我们可以知道其中有两个重要的组件,bufferwaitqhchan 所有行为和实现都是围绕这两个组件进行的。

向 channel 中发送数据

channel 的发送和接收流程很相似,我们先分析下 channel 的发送过程 (如 c <- 1), 对应于 runtime.chansend 函数的实现。

在尝试向 channel 中发送数据时,如果 recvq 队列不为空,则首先会从 recvq 中头部取出一个等待接收数据的 goroutine 出来。并将数据直接发送给该 goroutine。代码如下:

1
2
3
4
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

recvq 中是正在等待接收数据的 goroutine。当某个 goroutine 使用 recv 操作 (例如,x := <- c),如果此时 channel 的缓存中没有数据,且没有其他 goroutine 正在等待发送数据 (即 sendq 为空),会将该 goroutine 以及要接收的数据地址打包成 sudog 对象,并放入到 recvq 中。

继续接着讲上面的代码,如果此时 recvq 不为空,则调用 send 函数将数据拷贝到对应的 goroutine 的堆栈上。

send 函数的实现主要包含两点:

  1. memmove(dst, src, t.size) 进行数据的转移,本质上就是一个内存拷贝。
  2. goready(gp, skip+1) goready 的作用是唤醒对应的 goroutine。

而如果 recvq 队列为空,则说明此时没有等待接收数据的 goroutine,那么此时 channel 会尝试把数据放到缓存中。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
if c.qcount < c.dataqsiz {
// 相当于 c.buf[c.sendx]
qp := chanbuf(c, c.sendx)
// 将数据拷贝到 buffer 中
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

以上代码的作用其实非常简单,就是把数据放到 buffer 中而已。此过程涉及了 ring buffer 的操作,其中 dataqsiz 代表用户指定的 channel 的 buffer 大小,如果不指定则默认为 0。其他具体的详细操作后续过程会在 ring buffer 一节详细讲到。

如果用户使用的是无缓冲 channel 或者此时 buffer 已满,则 c.qcount < c.dataqsiz 条件不会满足, 以上流程也并不会执行到。此时会将当前的 goroutine 以及要发送的数据放入到 sendq 队列中,同时会切出该 goroutine。整个流程对应代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// 将 goroutine 转入 waiting 状态,并解锁
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

以上代码中,goparkunlock 就是解锁传入的 mutex,并切出该 goroutine,将该 goroutine 置为 waiting 状态。gopark 和上面的 goready 对应,互为逆操作。goparkgoready 在 runtime 的源码中会经常遇到,涉及了 goroutine 的调度过程,这里就不再深入讨论,以后会单独写一篇文章讲解。

调用 gopark 后,对于用户侧来看,该向 channel 发送数据的代码语句会进行阻塞。

以上过程就是 channel 的发送语句 (如,c <- 1) 的内部工作流程,同时整个发送过程都使用 c.lock 进行加锁,保证并发安全。

简单来说,整个流程如下:

  1. 检查 recvq 是否为空,如果不为空,则从 recvq 头部取一个 goroutine,将数据发送过去,并唤醒对应的 goroutine 即可。
  2. 如果 recvq 为空,则将数据放入到 buffer 中。
  3. 如果 buffer 已满,则将要发送的数据和当前 goroutine 打包成 sudog 对象放入到 sendq 中。并将当前 goroutine 置为 waiting 状态。

从 channel 中接收数据的过程基本与发送过程类似,此处不再赘述了。具体接收过程涉及到的 buffer 的相关操作,会在后面进行详细的讲解。

这里需要注意的是,channel 的整个发送过程和接收过程都使用 runtime.mutex 进行加锁。runtime.mutex 是 runtime 相关源码中常用到的一个轻量级锁。整个过程并不是最高效的 lockfree 的做法。golang 在这里有个 issue:go/issues#8899,给出了 lockfree 的 channel 的方案。

channel 的 ring buffer 实现

channel 中使用了 ring buffer(环形缓冲区) 来缓存写入的数据。ring buffer 有很多好处,而且非常适合用来实现 FIFO 式的固定长度队列。

在 channel 中,ring buffer 的实现如下:

channel 中 ring buffer 的实现

hchan 中有两个与 buffer 相关的变量: recvxsendx。其中 sendx 表示 buffer 中可写的 index, recvx 表示 buffer 中可读的 index。 从 recvxsendx 之间的元素,表示已正常存放入 buffer 中的数据。

我们可以直接使用 buf[recvx] 来读取到队列的第一个元素,使用 buf[sendx] = x 来将元素放到队尾。

buffer 的写入

当 buffer 未满时,将数据放入到 buffer 中的操作如下:

1
2
3
4
5
6
7
8
qp := chanbuf(c, c.sendx)
// 将数据拷贝到 buffer 中
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++

其中 chanbuf(c, c.sendx) 相当于 c.buf[c.sendx]。以上过程非常简单,就是将数据拷贝到 buffer 的 sendx 的位置上。

接着,将 sendx 移到下一个位置上。如果 sendx 已到达最后一位,则将其置为 0,这是一个典型的头尾相连的做法。

buffer 的读取

当 buffer 未满时,此时 sendq 里面也一定是空的 (因为如果 buffer 未满,用于发送数据的 goroutine 肯定不会排队,而是直接放数据到 buffer 中,具体逻辑参考上文向 channel 发送数据一节),这时候对于 channel 的读取过程 chanrecv 就比较简单了,直接从 buffer 中读取即可,也是一个移动 recvx 的过程。与上文 buffer 的写入基本一致。

sendq 里面有已等待的 goroutine 的时候,此时 buffer 一定是满的。这个时候 channel 的读取逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 相当于 c.buf[c.recvx]
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz

以上代码中,ep 接收数据的变量对应的地址。例如,在 x := <- c 中,表示变量 x 的地址。
sg 代表从 sendq 中取出的第一个 sudog。并且:

  1. typedmemmove(c.elemtype, ep, qp) 表示 buffer 中的当前可读元素拷贝到接收变量的地址处。
  2. typedmemmove(c.elemtype, qp, sg.elem) 表示将 sendq 中 goroutine 等待发送的数据拷贝到 buffer 中。因为此后进行了 recv++, 因此相当于把 sendq 中的数据放到了队尾。

简单来说,这里 channel 将 buffer 中队首的数据拷贝给了对应的接收变量,同时将 sendq 中的元素拷贝到了队尾,这样可以才可以做到数据的 FIFO(先入先出)。

接下来可能有点绕,c.sendx = c.recvx, 这句话实际的作用相当于 c.sendx = (c.sendx+1) % c.dataqsiz,因为此时 buffer 依然是满的,所以 sendx == recvx 是成立的。

总结

channel 作为 golang 中最常用设施,了解其源码可以帮助我们更好的理解和使用。同时也不会过于迷信和依赖 channel 的性能,channel 就目前的设计来说也还有更多的优化空间。

参考

cyhone wechat