继上篇 Go 底层原理丨锁,本篇将进入 Go 语言中关于通道(channel)底层原理的探讨。在 Rust 中,笔者参考 Mara Bos 的 《Rust Atomics and Locks》 实现了一个 oneshot channel,感兴趣的读者也可以参阅 Rust 实战丨手写一个 oneshot channel

特此声明,本篇是笔者基于 Go 1.25.3 版本源码、并与 Google Gemini 3Pro 共创所作,非常庆幸在当今 AI 时代下获取知识已是如此便利,且也为学习者从第一性原理理解所学知识大大降低了门槛。不过本篇的篇章安排和叙述逻辑,均由笔者把控和审阅,欢迎放心阅读。

结论先行

channel 底层是 hchan 结构体,包含了:

  • 一个环形缓存队列;
  • 接受者队列、发送者队列;
  • 锁;
  • 关闭标志;

发送:chansend()

  • 直接发送给阻塞中的接受者
  • 塞入缓存
  • 休眠等待

接收:chanrecv()

  • 直接接收阻塞中的发送者的数据
  • 从缓存拿
  • 休眠等待

1. 数据结构 hchan

channel 的底层数据结构 hchan 源码位于 runtime/chan.go#L34,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
timer *timer // timer feeding this chan
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
bubble *synctestBubble

// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}

其中以下五个字段组成了一个 环形缓冲队列

1
2
3
4
5
qcount   uint           // 当前在队列中的数据个数
dataqsiz uint // 环形队列大小
buf unsafe.Pointer // 指向环形队列的指针
elemsize uint16 // 每个数据大小
elemtype *_type // 数据类型

环形缓存可以大幅降低 GC 的开销。

其中还有四个字段组成了两个 链表

1
2
3
4
sendx    uint   // 下次要发送的数据的 index
recvx uint // 下个要接收的数据的 index
recvq waitq // 接受者等待队列
sendq waitq // 发送者等待队列

还有一个

1
lock mutex		// 保存 hchan 中的所有字段

互斥锁并不是排队发送 / 接收数据,它保护的是 hchan 结构体本身。

还有一个标记:

1
closed   uint32   // 标记 channel 是否已经关闭

2. 创建 makechan

创建 channel 的逻辑位于 runtime/chan.go#L75

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// makechan 创建一个新的 channel
// t: channel 的类型信息(包含元素类型)
// size: channel 的缓冲区大小(0 表示无缓冲)
func makechan(t *chantype, size int) *hchan {
elem := t.Elem

// === GC 优化说明 ===
// 当 buf 中的元素不包含指针时,hchan 对 GC 来说不包含需要追踪的指针
// - buf 指针指向同一次分配的内存(或单独分配的不含指针的内存)
// - elemtype 是持久化的类型元数据
// - sudog 由其所属线程引用,不会被回收
// TODO: 当 GC 支持移动对象时需要重新考虑这个设计

// === 三种分配策略 ===
var c *hchan
switch {
case mem == 0:
// 策略1: 无缓冲 channel 或元素大小为 0
// 只需分配 hchan 结构体本身,不需要缓冲区
// 示例:make(chan int, 0) 或 make(chan struct{}, 10)
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector 使用这个地址作为同步点
// 即使没有实际的 buf,也需要一个地址用于竞态检测
c.buf = c.raceaddr()

case !elem.Pointers():
// 策略2: 元素不包含指针(如 int, float, struct{int,int} 等)
// GC 优化:hchan 和 buf 在一次分配中完成,减少 GC 扫描开销
// 内存布局:[hchan 结构][buf 数组]
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// buf 指向紧跟在 hchan 后面的内存
c.buf = add(unsafe.Pointer(c), hchanSize)

default:
// 策略3: 元素包含指针(如 *int, string, slice, map 等)
// 必须分开分配,让 GC 能够正确追踪 buf 中的指针
// new(hchan) 会将 hchan 分配在 GC 扫描的内存区域
c = new(hchan)
// mallocgc 的第二个参数传入 elem 类型,让 GC 知道如何扫描这块内存
c.buf = mallocgc(mem, elem, true)
}

// === 初始化 hchan 字段 ===

c.elemsize = uint16(elem.Size_) // 元素大小(已检查不超过 uint16)
c.elemtype = elem // 元素类型信息(用于类型安全的内存操作)
c.dataqsiz = uint(size) // 循环队列容量

// 如果当前 goroutine 在 synctest bubble 中,关联到 channel
// synctest 是用于确定性测试的机制
if b := getg().bubble; b != nil {
c.bubble = b
}

// 初始化互斥锁,指定锁的等级(用于死锁检测)
lockInit(&c.lock, lockRankHchan)
return c
}

策略 1:无缓冲/零大小元素

1
2
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
  • 无缓冲 channel:size = 0,数据直接在 goroutine 间传递

  • 零大小元素:struct{},不需要实际存储空间

策略 2:元素不含指针

1
2
3
case !elem.Pointers():
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
  • 一次分配:减少内存碎片,提高缓存局部性

  • GC 优化:整块内存标记为"无指针",GC 扫描时可以跳过

  • 内存布局:

    1
    2
    低地址 → 高地址
    [hchan 结构体][buf[0]][buf[1]]...[buf[n-1]]

策略 3:元素含指针

1
2
3
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
  • mallocgc 的第二个参数 elem 告诉 GC 这块内存的类型

  • GC 需要递归扫描 buf 中的每个元素,查找其中的指针

  • 如果用策略 2,GC 无法正确追踪 buf 中的指针,导致对象被错误回收

示例:

1
2
3
4
5
type Node struct {
Value int
Next *Node // 指针!
}
ch := make(chan *Node, 10) // 使用策略 3

3. 发送 chansend

  1. 对整个 channel 上锁;
  2. 检查 channel 是否已经关闭,若关闭,这 panic;
  3. 检查是否有正在等待中的协程:
    1. 有的话,直接将数据拷贝给它,然后唤醒它;
    2. 没有,则检查缓存队列是否已满:
      1. 没有满,则将数据塞入缓存队列中;
      2. 已满,则把自己包装成 sudog 放入 sendq 队列,休眠并解锁,等待唤醒。被唤醒后数据已经被取走了,当下 sudog 负责维护其他的数据;
  4. 解锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 快速路径:在非阻塞情况(select)下,如果 channel 已经满了,快速返回
if !block && c.closed == 0 && full(c) {
return false
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

// 1. 上锁修改队列
lock(&c.lock)

// 2. 不允许往已关闭的 channel 发送数据
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

// 3. 检查是否有阻塞中的接收者,如果有,取出一个,直接将数据交付给它
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

// 4. 没有接受者,且缓冲区还有位置,则数据进入缓冲区,直接返回
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

// 5. 非阻塞,返回 false
if !block {
unlock(&c.lock)
return false
}

// 6. 没有接受者,缓冲区也没有位置,且是阻塞队列,则当前协程入队休眠,等待接受者唤醒
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) // 入队
gp.parkingOnChan.Store(true)
reason := waitReasonChanSend
if c.bubble != nil {
reason = waitReasonSynctestChanSend
}

// 7. 陷入阻塞,等待唤醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2)
KeepAlive(ep)

// 8. 被唤醒了
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil

// 9. 被唤醒的时候,数据其实已经被取走了,mysg 负责维护其他数据
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}

4. 接收 chanrecv

  1. 对整个 channel 上锁;
  2. 如果 channel 已经关闭,且缓存中没有数据,如果这个时候 eq 指向的地址有数据,则清空数据;
  3. 检查是否有等待中的 sender:
    1. 有,则看 channel 有无缓存:
      1. 没有,则直接从 sender 中取走数据,唤醒 sender;
      2. 有,则说明缓存已满,从缓存队列队头取走数据,然后将 sender 数据塞到队尾,唤醒 sender;
    2. 无,则看 channel 有无缓存:
      1. 有,则直接从缓存中取走数据,维护队列索引,解锁返回;
      2. 无,则将自己包装成 sudog,放入 recvq 休眠等待唤醒,被唤醒的时候,sender 已经将数据拷贝到位了;
  4. 解锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 快速路径:非阻塞(select)且队列为空,则直接返回
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
return true, false
}
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

// 1. 对整个 channel 上锁
lock(&c.lock)

// 2. 检查 channel 是否已经被关闭,且缓存中没有数据
if c.closed != 0 {
// 缓冲区没有数据,返回 false
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
// 清除 ep 指向的数据
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
// 3. 如果没有关闭,且有阻塞中的发送者,则直接接收发送者的数据,然后唤醒它
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}

// 4. 没有等待中的 sender,且缓存中有数据,则时间从缓存队列中取出数据,并解锁返回
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

// 5. 非阻塞情况下,没有获取到数据,则返回 false
if !block {
unlock(&c.lock)
return false, false
}

// 6. 缓存中没有数据,则将自己包装成 sudog,放入 recvq 队列中,休眠等待唤醒
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg

mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
if c.timer != nil {
blockTimerChan(c)
}

gp.parkingOnChan.Store(true)
reason := waitReasonChanReceive
if c.bubble != nil {
reason = waitReasonSynctestChanReceive
}
// 6. 被唤醒,唤醒的时候,sender 已经将数据拷贝到 receiver 的 ep 所指向的位置了(也就是 chansend 的第 3 步)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanRecv, 2)

// 被唤醒,这个时候,数据已经接收到了
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
if c.timer != nil {
unblockTimerChan(c)
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}

5. 关闭 closechan

  1. 设置 c.closed = 1
  2. 唤醒所有接收者(返回零值,success = false)
  3. 唤醒所有发送者(会 panic)
  4. 释放锁后再调用 goready,避免持锁调度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func closechan(c *hchan) {
// 1. 不能关闭 nil channel
if c == nil {
panic(plainError("close of nil channel"))
}
if c.bubble != nil && getg().bubble != c.bubble {
fatal("close of synctest channel from outside bubble")
}

// 2. 上锁
lock(&c.lock)

// 3. 不能重复关闭 channel
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

// 4. 设置关闭标识
c.closed = 1

var glist gList

// 5. 唤醒所有接收者(返回零值,success = false)
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}

// 6. 唤醒所有发送者(会 panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)

// 7. 释放锁后再调用 goready,避免持锁调度
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}

6. 实践建议

通过上述对 channel 各种操作的源码分析,我们可以发现存在一些容易 panic 的点:

  • 往未初始化的 channel 发送数据,会 panic
  • 重复关闭 channel,会 panic
  • 往关闭的 channel 发送数据,会 panic

这里笔者总结了一些实践建议,供参考。

基于 go-channel.md 中解析的底层原理,Go channel 的 panic 风险主要源于对 hchan 状态的错误操作(特别是 closed 状态)。

以下是基于第一性原理(源码逻辑)总结的 channel 使用最佳实践:

6.1 核心原则:谁发送,谁关闭

这是避免 panic 的第一铁律。

  • 原理:源码中 chansend 会在检测到 c.closed != 0 时直接 panic。同时,closechan 唤醒被阻塞的发送者时,发送者被唤醒后检测到 channel 已关闭也会 panic。
  • 建议:只有发送端(Sender)才有资格关闭 channel。
    • 如果 channel 是由接收端(Receiver)关闭的,发送端无法感知,一旦再次发送就会 panic。
    • 如果有多个发送端:不要在发送端关闭 channel。应该使用一个额外的信号 channel(stop channel)或者是 sync.WaitGroup 来协调,或者让 channel 由 GC 自动回收(如果没有 goroutine 引用它)。

6.2 严禁重复关闭

  • 原理closechan 函数开头就会检查 c.closed,如果不为 0,会直接 panic "close of closed channel"。
  • 建议
    • 确保代码逻辑中 close() 只被执行一次。
    • 在复杂的多并发场景下,如果无法确定谁是最后一个关闭者,可以使用 sync.Once 来封装关闭操作,确保幂等性。

6.3 接收端使用 "comma, ok" 句式

  • 原理chanrecv 在 channel 已关闭且缓存无数据时,会返回对应类型的零值,并且返回的 success (即 ok) 为 false
  • 建议
    • 总是检查接收操作的第二个返回值:val, ok := <-ch
    • 如果 !ok,说明 channel 已关闭且已读完,应当退出接收循环,而不是继续处理零值。

6.4 避免关闭 nil channel

  • 原理closechan 第一步检查 if c == nil,如果是则 panic "close of nil channel"。
  • 建议
    • 在使用 channel 前确保它已被 make 初始化。
    • 小心处理结构体中的 channel 字段,确保它们不是默认的 nil 值。

6.5 优雅退出模式(Signal Channel)

当有多个发送者(N Senders)或 1 个接收者想停止多个发送者时,不要直接关闭数据 channel。

  • 原理:直接关闭会导致正在运行的发送者 panic。
  • 建议
    • 创建一个专门的 donestop channel(通常是 chan struct{})。
    • 接收者通过 close(done) 进行广播(利用了“从已关闭 channel 接收会立即返回零值”的特性)。
    • 发送者在 select 中同时监听 dataChdone,一旦 done 关闭,立即停止发送。