继上篇 Go
底层原理丨锁 ,本篇将进入 Go
语言中关于通道(channel)底层原理的探讨。在 Rust 中,笔者参考 Mara Bos
的 《Rust Atomics and Locks》
实现了一个 oneshot channel,感兴趣的读者也可以参阅 Rust
实战丨手写一个 oneshot channel 。
特此声明,本篇是笔者基于 Go 1.25.3 版本源码、并与 Google Gemini 3Pro
共创所作,非常庆幸在当今 AI
时代下获取知识已是如此便利,且也为学习者从第一性原理理解所学知识大大降低了门槛。不过本篇的篇章安排和叙述逻辑,均由笔者把控和审阅,欢迎放心阅读。
结论先行
channel 底层是 hchan 结构体,包含了:
一个环形缓存队列;
接受者队列、发送者队列;
锁;
关闭标志;
发送:chansend()
接收:chanrecv()
1. 数据结构 hchan
channel 的底层数据结构 hchan 源码位于 runtime/chan.go#L34 ,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 type hchan struct { qcount uint dataqsiz uint buf unsafe.Pointer elemsize uint16 closed uint32 timer *timer elemtype *_type sendx uint recvx uint recvq waitq sendq waitq bubble *synctestBubble lock mutex }
其中以下五个字段组成了一个 环形缓冲队列 :
1 2 3 4 5 qcount uint dataqsiz uint buf unsafe.Pointer elemsize uint16 elemtype *_type
环形缓存可以大幅降低 GC 的开销。
其中还有四个字段组成了两个 链表 :
1 2 3 4 sendx uint recvx uint recvq waitq sendq waitq
还有一个 锁 :
互斥锁并不是排队发送 / 接收数据,它保护的是 hchan 结构体本身。
还有一个标记:
2. 创建 makechan
创建 channel 的逻辑位于 runtime/chan.go#L75 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 func makechan (t *chantype, size int ) *hchan { elem := t.Elem var c *hchan switch { case mem == 0 : c = (*hchan)(mallocgc(hchanSize, nil , true )) c.buf = c.raceaddr() case !elem.Pointers(): c = (*hchan)(mallocgc(hchanSize+mem, nil , true )) c.buf = add(unsafe.Pointer(c), hchanSize) default : c = new (hchan) c.buf = mallocgc(mem, elem, true ) } c.elemsize = uint16 (elem.Size_) c.elemtype = elem c.dataqsiz = uint (size) if b := getg().bubble; b != nil { c.bubble = b } lockInit(&c.lock, lockRankHchan) return c }
策略 1:无缓冲/零大小元素
1 2 case mem == 0 : c = (*hchan)(mallocgc(hchanSize, nil , true ))
策略 2:元素不含指针
1 2 3 case !elem.Pointers(): c = (*hchan)(mallocgc(hchanSize+mem, nil , true )) c.buf = add(unsafe.Pointer(c), hchanSize)
策略 3:元素含指针
1 2 3 default : c = new (hchan) c.buf = mallocgc(mem, elem, true )
mallocgc 的第二个参数 elem 告诉 GC 这块内存的类型
GC 需要递归扫描 buf 中的每个元素,查找其中的指针
如果用策略 2,GC 无法正确追踪 buf
中的指针,导致对象被错误回收
示例:
1 2 3 4 5 type Node struct { Value int Next *Node } ch := make (chan *Node, 10 )
3. 发送 chansend
对整个 channel 上锁;
检查 channel 是否已经关闭,若关闭,这 panic;
检查是否有正在等待中的协程:
有的话,直接将数据拷贝给它,然后唤醒它;
没有,则检查缓存队列是否已满:
没有满,则将数据塞入缓存队列中;
已满,则把自己包装成 sudog 放入 sendq
队列,休眠并解锁,等待唤醒。被唤醒后数据已经被取走了,当下 sudog
负责维护其他的数据;
解锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 func chansend (c *hchan, ep unsafe.Pointer, block bool , callerpc uintptr ) bool { if !block && c.closed == 0 && full(c) { return false } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic (plainError("send on closed channel" )) } if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func () { unlock(&c.lock) }, 3 ) return true } if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) gp.parkingOnChan.Store(true ) reason := waitReasonChanSend if c.bubble != nil { reason = waitReasonSynctestChanSend } gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2 ) KeepAlive(ep) if mysg != gp.waiting { throw("G waiting list is corrupted" ) } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2 ) } mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup" ) } panic (plainError("send on closed channel" )) } return true }
4. 接收 chanrecv
对整个 channel 上锁;
如果 channel 已经关闭,且缓存中没有数据,如果这个时候 eq
指向的地址有数据,则清空数据;
检查是否有等待中的 sender:
有,则看 channel 有无缓存:
没有,则直接从 sender 中取走数据,唤醒 sender;
有,则说明缓存已满,从缓存队列队头取走数据,然后将 sender
数据塞到队尾,唤醒 sender;
无,则看 channel 有无缓存:
有,则直接从缓存中取走数据,维护队列索引,解锁返回;
无,则将自己包装成 sudog,放入 recvq
休眠等待唤醒,被唤醒的时候,sender 已经将数据拷贝到位了;
解锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 func chanrecv (c *hchan, ep unsafe.Pointer, block bool ) (selected, received bool ) { if !block && empty(c) { if atomic.Load(&c.closed) == 0 { return } if empty(c) { return true , false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) if c.closed != 0 { if c.qcount == 0 { unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true , false } } else { if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func () { unlock(&c.lock) }, 3 ) return true , true } } if c.qcount > 0 { qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil ) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true , true } if !block { unlock(&c.lock) return false , false } gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) if c.timer != nil { blockTimerChan(c) } gp.parkingOnChan.Store(true ) reason := waitReasonChanReceive if c.bubble != nil { reason = waitReasonSynctestChanReceive } gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanRecv, 2 ) if mysg != gp.waiting { throw("G waiting list is corrupted" ) } if c.timer != nil { unblockTimerChan(c) } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2 ) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true , success }
5. 关闭 closechan
设置 c.closed = 1
唤醒所有接收者(返回零值,success = false)
唤醒所有发送者(会 panic)
释放锁后再调用 goready,避免持锁调度
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 func closechan (c *hchan) { if c == nil { panic (plainError("close of nil channel" )) } if c.bubble != nil && getg().bubble != c.bubble { fatal("close of synctest channel from outside bubble" ) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic (plainError("close of closed channel" )) } c.closed = 1 var glist gList for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false glist.push(gp) } for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock) for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3 ) } }
6. 实践建议
通过上述对 channel 各种操作的源码分析,我们可以发现存在一些容易 panic
的点:
往未初始化的 channel 发送数据,会 panic
重复关闭 channel,会 panic
往关闭的 channel 发送数据,会 panic
这里笔者总结了一些实践建议,供参考。
基于 go-channel.md 中解析的底层原理,Go channel 的 panic
风险主要源于对 hchan 状态的错误操作(特别是
closed 状态)。
以下是基于第一性原理(源码逻辑)总结的 channel 使用最佳实践:
6.1 核心原则:谁发送,谁关闭
这是避免 panic 的第一铁律。
原理 :源码中 chansend 会在检测到
c.closed != 0 时直接 panic。同时,closechan
唤醒被阻塞的发送者时,发送者被唤醒后检测到 channel 已关闭也会
panic。
建议 :只有发送端 (Sender)才有资格关闭
channel。
如果 channel
是由接收端(Receiver)关闭的,发送端无法感知,一旦再次发送就会
panic。
如果有多个发送端 :不要在发送端关闭
channel。应该使用一个额外的信号 channel(stop channel)或者是
sync.WaitGroup 来协调,或者让 channel 由 GC
自动回收(如果没有 goroutine 引用它)。
6.2 严禁重复关闭
原理 :closechan 函数开头就会检查
c.closed,如果不为 0,会直接 panic "close of closed
channel"。
建议 :
确保代码逻辑中 close() 只被执行一次。
在复杂的多并发场景下,如果无法确定谁是最后一个关闭者,可以使用
sync.Once 来封装关闭操作,确保幂等性。
6.3 接收端使用 "comma, ok" 句式
原理 :chanrecv 在 channel
已关闭且缓存无数据时,会返回对应类型的零值,并且返回的
success (即 ok) 为 false。
建议 :
总是检查接收操作的第二个返回值:val, ok := <-ch。
如果 !ok,说明 channel
已关闭且已读完,应当退出接收循环,而不是继续处理零值。
6.4 避免关闭 nil channel
原理 :closechan 第一步检查
if c == nil,如果是则 panic "close of nil channel"。
建议 :
在使用 channel 前确保它已被 make 初始化。
小心处理结构体中的 channel 字段,确保它们不是默认的 nil 值。
6.5 优雅退出模式(Signal
Channel)
当有多个发送者(N Senders)或 1
个接收者想停止多个发送者时,不要直接关闭数据 channel。
原理 :直接关闭会导致正在运行的发送者 panic。
建议 :
创建一个专门的 done 或 stop
channel(通常是 chan struct{})。
接收者通过 close(done) 进行广播(利用了“从已关闭
channel 接收会立即返回零值”的特性)。
发送者在 select 中同时监听 dataCh 和
done,一旦 done 关闭,立即停止发送。