本篇将进入 Go 语言中关于锁的底层原理的探讨,笔者有幸阅读过 Mara Bos 的 《Rust Atomics and Locks》,该书对锁这一概念和底层原理进行了非常详尽的探讨,并且给出了 Rust 中 SpinLock、Mutex、RWMutex、Channel 和 Arc 等基础并发工具的手写实战案例,对于想更加深入理解并发编程尤其那些想手写并发工具的读者,非常推荐阅读该书。

特此声明,本篇是笔者基于 Go 1.25.3 版本源码、并与 Google Gemini 3Pro 共创所作,非常庆幸在当今 AI 时代下获取知识已是如此便利,且也为学习者从第一性原理理解所学知识大大降低了门槛。不过本篇的篇章安排和叙述逻辑,均由笔者把控和审阅,欢迎放心阅读。

结论先行

本篇我们将探讨 Go 语言中的各种"锁"的底层实现原理,包括 MutexRWMutexWaitGroupOnce 。它们都离不开两个核心基础:atomicsema

  • atomic 即原子变量,是一种硬件层面加锁的机制,可以保证基本类型在高并发下的并发安全性,实现原子操作。
  • sema 全称 semaphore,也叫信号锁 / 信号量锁,它的核心是一个 uint32 类型的值,含义是同时可并发的协程数量。在 Go 语言里面,每个 seam 背后都对应一个 semaRoot结构体。

我们先给出上述几种并发工具的简要概述,后文再进行详细阐述:

  • Mutex:互斥锁,只能有一个持有者。
    • 正常模式:得到锁返回,得不到锁自旋,自旋多了就饥饿。
    • 饥饿模式:不自选,直接入队等待。依次从队里唤醒协程并授予锁。
  • RWMutex:读写锁,只能一个写,可以同时多个读。
  • WaitGroup:一组协程等待另外一组协程全部执行完毕再执行。
  • Once:控制一段代码在并发中只执行一次。

1. Go 锁的两大基础

1.1 原子操作

Go 在 sync/atomic 包提供了一系列基本类型的原子操作,使用这些操作,可以保证基本类型在高并发下的并发安全性,实现原子操作。

  • SwapInt32
  • CompareAndSwapInt32
  • AddInt32
  • LoadInt32
  • StoreInt32
1
2
// AddInt32 atomically adds delta to *addr and returns the new value.
func AddInt32(addr *int32, delta int32) (new int32)

查看 AMD64 的汇编时,我们会发现其中有一个 LOCK 指令:

1
2
3
4
5
6
7
8
9
10
11
12
13
// uint32 Xadd(uint32 volatile *val, int32 delta)
// Atomically:
// *val += delta;
// return *val;
TEXT ·Xadd(SB), NOSPLIT, $0-20
MOVQ ptr+0(FP), BX
MOVL delta+8(FP), AX
MOVL AX, CX
LOCK
XADDL AX, 0(BX)
ADDL CX, AX
MOVL AX, ret+16(FP)
RET

可以再看一下 ARM64 的汇编代码,我们会发现其中有:LDADDALWLDAXRWSTLXRW 指令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
TEXT ·Xadd(SB), NOSPLIT, $0-20
MOVD ptr+0(FP), R0
MOVW delta+8(FP), R1
#ifndef GOARM64_LSE
MOVBU internal∕cpu·ARM64+const_offsetARM64HasATOMICS(SB), R4
CBZ R4, load_store_loop
#endif
LDADDALW R1, (R0), R2
ADD R1, R2
MOVW R2, ret+16(FP)
RET
#ifndef GOARM64_LSE
load_store_loop:
LDAXRW (R0), R2
ADDW R2, R1, R2
STLXRW R2, (R0), R3
CBNZ R3, load_store_loop
MOVW R2, ret+16(FP)
RET
#endif

概括来说:

[!IMPORTANT]

原子操作的底层实现依赖于 86 的 lock 前缀或 ARM 的 LL/SC,而这二者又依赖于硬件级别的协同机制,其核心是通过 缓存一致性协议总线仲裁指令集层面的特殊支持 来保证多核环境下的原子性和内存顺序。

对于原子操作的底层原理和硬件层面的细节,感兴趣的读者可以阅读我这两篇笔记:

1.2 sema 锁

1.2.1 概述

  • sema 锁全称 semaphore,也叫信号锁 / 信号量锁。
  • sema 的核心是一个 uint32 类型的值,含义是同时可并发的协程数量。
  • 每一个 sema 锁都对应一个 semaRoot结构体。
  • semaRoot 中有一个平衡二叉树用于协程排队。

1.2.2 数据结构

internal/sync/mutex.go#L20 定义了 Mutex 的数据结构,如下:

1
2
3
4
5
6
7
// A Mutex is a mutual exclusion lock.
//
// See package [sync.Mutex] documentation.
type Mutex struct {
state int32
sema uint32
}

其中第二个元素 sema,便是一个 sema 锁,它本质上是一个 semaRoot 结构体的值。

semaRoot 定义在 runtime/sema.go#L40

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Asynchronous semaphore for sync.Mutex.

// A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem).
// Each of those sudog may in turn point (through s.waitlink) to a list
// of other sudogs waiting on the same address.
// The operations on the inner lists of sudogs with the same address
// are all O(1). The scanning of the top-level semaRoot list is O(log n),
// where n is the number of distinct addresses with goroutines blocked
// on them that hash to the given semaRoot.
// See golang.org/issue/17953 for a program that worked badly
// before we introduced the second level of list, and
// BenchmarkSemTable/OneAddrCollision/* for a benchmark that exercises this.
type semaRoot struct {
lock mutex // 保护整个数据结构的锁
treap *sudog // Treap 的根节点
nwait atomic.Uint32 // 等待者数量(可无锁读取,用于快速判断)
}

// 简单理解:Runtime 内部专用锁
// 通过一个 uintptr 字段同时存储状态标志位和等待 M 线程的指针栈(低 10 位是状态,高位是 M 链表头),
// 直接用 OS 信号量阻塞 M 线程,禁用抢占,零分配,不触发 GC。
type mutex struct {
lockRankStruct
key uintptr
}

// sudog (pseudo-g) 代表的是一个在等待队列中的 goroutine
type sudog struct {
g *g // 指向等待的 goroutine

// === Treap 二叉树指针 ===
parent *sudog // 父节点
prev *sudog // 左子节点
next *sudog // 右子节点

// === 同地址等待链表 ===
waitlink *sudog // 链表中的下一个等待者
waittail *sudog // 链表尾部(只在头节点有效)

// === 地址和优先级 ===
elem unsafe.Pointer // 等待的地址(如 &mutex.sema)
ticket uint32 // Treap 的堆优先级(随机数)

// === 统计和性能分析 ===
waiters uint16 // 链表中其他等待者的数量(头节点)
acquiretime int64 // 开始等待的时间
releasetime int64 // 被唤醒的时间
}

1.2.3 操作

当 unit32 > 0 时,表示可以并发的协程个数

  • 获取锁:sema - 1, 获得锁成功
  • 释放锁:sema + 1,释放锁成功

当 unit32 = 0 时,表示没锁了,sema 锁退化成一个专用的休眠队列

  • 获取锁:进入堆树等待,协程休眠;
  • 释放锁:从堆树中取出一个协程并唤醒

1.2.4 semeacquire()

semaacuqire() 尝试递减计数器,失败则创建 sudog 加入等待队列并休眠,等待被唤醒。

  • sema > 0:sema --
  • sema = 0:将协程放入堆树中等待,并休眠

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}

// 快捷路径,先进行一次简单的原子操作尝试获取锁,成功则直接返回
if cansemacquire(addr) {
return
}

// 慢速路径:
// Harder case:
// increment waiter count
// try cansemacquire one more time, return if succeeded
// enqueue itself as a waiter
// sleep
// (waiter descriptor is dequeued by signaler)
// 获取 sema 底层的 semaRoot,并为它赋初始值
s := acquireSudog()
root := semtable.rootFor(addr)
for {
lockWithRank(&root.lock, lockRankRoot)
// 1. 自增等待者数量
root.nwait.Add(1)
// 2. 再次尝试获取锁,成功则可以返回了
if cansemacquire(addr) {
root.nwait.Add(-1)
unlock(&root.lock)
break
}
// 3. 还是失败,则进入休眠队列
root.queue(addr, s, lifo)
// 4. 调用 gopark() 休眠协程(不了解 gopark 可以先去了解一下 GMP 底层原理)
goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3+skipframes)
}
releaseSudog(s)
}

// 判断是否可以获取 sema 锁
func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
// sema 为 0,表示没锁了,这个时候是一个等待队列,无法直接获取锁
if v == 0 {
return false
}
// sema 大于 0,说明这个时候有可以并发的协程个数,尝试进行 cas 获取锁,成功则返回 true
if atomic.Cas(addr, v, v-1) {
return true
}
}
}

1.2.5 semarelease()

semarelease() 递增计数器,如果有等待者则从队列中取出一个 sudog 并唤醒对应的 goroutine,handoff 模式下直接移交锁并让出 CPU。

  • 无等待中的协程:直接返回
  • 有等待中的协程:从堆树中出队一个协程,唤醒,并调度到当前 P 的 runq 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// semrelease1 释放一个信号量,如果有等待者则唤醒一个 goroutine
// addr: 信号量地址(如 &mutex.sema)
// handoff: 是否直接移交(饥饿模式下为 true,直接把锁交给被唤醒者)
// skipframes: 用于性能分析时跳过的栈帧数
func semrelease1(addr *uint32, handoff bool, skipframes int) {
// 通过 addr 的哈希值找到对应的 semaRoot
root := semtable.rootFor(addr)

// ===== 关键操作:先递增信号量计数器 =====
// 无论有没有等待者,都原子地将 *addr 加 1
// 这相当于传统信号量的 V 操作,释放一个"资源"
atomic.Xadd(addr, 1)

// ===== Fast Path:快速检查是否有等待者 =====
//
// 注意顺序很重要!必须先执行 Xadd,再检查 nwait
// 原因:防止 "错过唤醒" 的竞态条件
// - semacquire 的顺序是:先 nwait++,再 cansemacquire,最后 gopark
// - 如果我们先检查 nwait 再 Xadd,可能在两步之间 semacquire 刚好 nwait++
// 导致我们误以为没有等待者而不唤醒,但 goroutine 已经睡眠了
if root.nwait.Load() == 0 {
return // 没有等待者,直接返回(信号量值已经被递增了)
}

// ===== Slow Path:有等待者,需要唤醒一个 =====
//
// 获取 semaRoot 的锁,保护等待队列的并发访问
lockWithRank(&root.lock, lockRankRoot)

// 持锁后再次检查 nwait,因为可能在获取锁之前已经被其他人消费了
if root.nwait.Load() == 0 {
// 场景:我们在获取锁期间,另一个 goroutine 可能:
// 1. 调用了 semacquire 并通过 cansemacquire 成功获取(消费了我们递增的值)
// 2. 减少了 nwait 计数
// 所以不需要再唤醒任何人了
unlock(&root.lock)
return
}

// 从等待队列中取出一个等待者(sudog)
// s: 要唤醒的 sudog
// t0: 当前时间(用于性能统计)
// tailtime: 队尾等待者的开始等待时间(用于计算平均等待时间)
s, t0, tailtime := root.dequeue(addr)

if s != nil {
// 成功取出一个等待者,减少等待者计数
root.nwait.Add(-1)
}

// 尽快释放锁,因为后续操作可能很慢(甚至可能让出 CPU)
unlock(&root.lock)
if s != nil {
// ===== Handoff 模式:直接移交锁 =====
if handoff && cansemacquire(addr) {
// handoff=true 表示饥饿模式
// 尝试提前消费信号量(将 *addr 减 1)
// 如果成功,设置 ticket=1,告诉被唤醒的 goroutine:
// "你不需要竞争了,锁已经是你的了"
s.ticket = 1
}

// ===== 唤醒 goroutine =====
// 将 sudog 对应的 goroutine 标记为可运行
// 并将其加入到当前 P 的 runnext 位置(优先运行)
readyWithTime(s, 5+skipframes)

// ===== Direct G Handoff:直接切换到被唤醒者 =====
if s.ticket == 1 && getg().m.locks == 0 && getg() != getg().m.g0 {
// 条件满足时,主动让出 CPU 给被唤醒者立即运行:
// - s.ticket == 1:已经直接移交了锁
// - getg().m.locks == 0:当前没有持有其他 runtime 锁
// - getg() != getg().m.g0:不在系统栈上
//
// readyWithTime 已经把被唤醒者放到了 P 的 runnext 位置
// 现在调用 goyield() 主动让出时间片:
// - 被唤醒者继承我们的时间片,立即开始运行
// - 避免高竞争场景下某个 goroutine 霸占 P
// - goyield 会把当前 G 放到本地队列(不是全局队列)
//
// 只在饥饿模式(handoff=true)下这样做,因为:
// - 正常模式:被唤醒者可能竞争失败,让出 CPU 会浪费
// - 饥饿模式:已经直接移交,保证能获取锁,不会浪费
//
// 相关讨论见 issue 33747
goyield()
}
}
}

1.2.6 深度理解

sema 是 Go sync.Mutex 连接 Go 运行时 (Runtime)操作系统 (OS) 的关键枢纽。 sema 就是用来解决"拿不到锁的 Goroutine 到底去了哪里、怎么睡、怎么醒"的关键问题。

我们要从以下三个层次由浅入深地理解 sema

  1. 数据结构层:它是怎么存储等待者的?
  2. 运行时层 (Runtime):Go 如何高效管理成千上万个锁?
  3. 操作系统层 (OS):底层的 futex 到底在做什么?

1.2.6.1 第一层:它在内存中是什么?

sync.Mutex 的定义中:

1
2
3
4
type Mutex struct {
state int32
sema uint32 // <--- 就是它
}

前面我们讨论过,sema 本质上只是一个内存地址(Address)

  • 作为 Key:Go 运行时并不关心 sema 变量里存的具体数值是多少(虽然它确实会变),运行时真正关心的是 &sema(这个变量在内存中的地址)。
  • 全局哈希表:Go 运行时维护了一个全局的哈希表(semTable),在这个表中:
    • Key &sema (Mutex 中 sema 字段的内存地址)。
    • Value 一个等待队列(平衡二叉树),里面躺着一个个正在睡觉的 Goroutine。
1
2
3
4
5
6
7
8
9
10
11
12
13
var semtable semTable

// Prime to not correlate with any user patterns.
const semTabSize = 251

type semTable [semTabSize]struct {
root semaRoot
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}

func (t *semTable) rootFor(addr *uint32) *semaRoot {
return &t[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}

为什么这么设计? 如果每个 Mutex 都向操作系统申请一个专门的内核信号量对象,开销太大了。Go 程序中可能有数百万个 Mutex,通过把它们映射到一个固定大小的全局哈希表中,Go 实现了极高的扩展性。

1.2.6.2 第二层:运行时调度 (GMP)

state 字段判断需要阻塞时,Go 会调用 runtime_SemacquireMutex(&m.sema, ...)(其实背后就是上面提到的 semacuqire())。这背后发生了什么?这是与 GMP 模型 交互的核心。

state 字段判断需要阻塞时,Go 会调用 runtime_SemacquireMutex(&m.sema, ...)。这背后发生了什么?这是与 GMP 模型 交互的核心。

1. 包装:从 G 到 Sudog

Goroutine (G) 是不能直接挂在链表上的。Go 使用了一个中间结构体叫 sudog

  • 当一个 G 需要阻塞时,运行时会创建一个 sudog,把这个 G 包装进去。
  • 这个 sudog 代表了"一个在特定信号量上等待的 G"。

2. 入队与休眠

  1. 计算哈希:根据 &sema 的地址,算出它在全局 semTable 中的位置。
  2. 挂载:把包装好的 sudog 挂到该位置的 Treap 尾部。
  3. 切出 (Park)
    • 调用 goparkunlock
    • 关键点:当前的 M (系统线程) 会断开与当前 G 的关系。
    • G 的状态从 Running 变为 Waiting
    • M 并没有睡觉,它会去 P (处理器) 的本地队列里找下一个可运行的 G 来执行。
    • 这就是 Go 高并发的精髓:用户层面的阻塞锁,并没有阻塞底层的系统线程(除非没有其他工作可做)。

3. 唤醒 (Handoff)

Unlock 调用 runtime_Semrelease(&m.sema) (即 semarelease())时:

  1. 查找:再次根据 &sema 地址去全局哈希表里找。
  2. 出队:取出链表头部的 sudog
  3. 调度
    • sudog 里的 G 取出来。
    • 将 G 的状态从 Waiting 改为 Runnable
    • 把它扔到当前 P 的运行队列或者全局运行队列中,等待被 M 执行。

1.2.6.3 第三层:操作系统原语

这就到了物理实现的底座了。如果 M 发现没有别的 G 可以执行了,或者 Go 运行时本身的某些同步需要,它最终必须依赖操作系统的能力来让 CPU 停下来。

在 Linux 平台上,sema 的底层实现依赖于 Futex (Fast Userspace Mutex)

Futex 是 Linux 内核提供的一种机制,它的核心理念是:即使需要内核介入,也要尽量减少陷入内核的次数。

它包含两个操作:

  1. User Space Check (用户态检查):先检查内存中的一个整数(就是 sema 的值)。如果条件满足(比如有信号),直接走人,完全不涉及内核。
  2. Kernel Wait (内核态等待):只有当条件不满足时,才发起系统调用(System Call),让内核把线程挂起。

runtime/os_linux.go 中,你会看到类似这样的汇编或封装调用:

  • 休眠 (futexsleep): 调用 futex(addr, FUTEX_WAIT, val, ...)。 意思就是:“内核老兄,请你看看 addr 这个内存地址的值是不是 val?如果是,就把我(当前线程 M)挂起;如果不是,说明中间有人改过(可能有信号了),那我就不睡了,直接返回。”
  • 唤醒 (futexwakeup): 调用 futex(addr, FUTEX_WAKE, count, ...)。 意思就是:“内核老兄,在这个地址上睡觉的线程,请帮我叫醒 count 个。”

关于 Futex 的更多细节,推荐阅读笔者整理的:Rust 原理丨操作系统并发原语

1.3 总结

atomic 和 sema 是 Go 并发的"阴阳二元":

atomic sema
哲学 乐观(假设无竞争) 悲观(接受竞争)
机制 硬件指令 OS/Runtime 调度
速度 极快(纳秒) 较慢(微秒)
能力 状态变更 休眠/唤醒
使用 所有路径 慢速路径
目标 性能 正确性 + 公平性

所有 Go 的同步原语都是这两者的不同组合方式,遵循 "Fast Path with Atomic, Slow Path with Semaphore" 的设计模式!🎯

用一句话总结就是:

[!IMPORTANT]

Atomic 提供无锁的快速状态管理(CAS、加减),sema 提供有竞争时的 goroutine 休眠/唤醒机制,两者组合实现"乐观尝试 + 悲观等待"的高效并发模型。

graph LR
    subgraph "性能层级"
        A[atomic
纳秒级
99% 场景] B[sema
微秒级
1% 竞争] end A -->|无竞争| Fast[Fast Path] A -->|低竞争
自旋| Spin[Spin] B -->|高竞争| Slow[Slow Path
休眠/唤醒] style A fill:#ccffcc style B fill:#e1f5ff style Fast fill:#90EE90 style Slow fill:#FFB6C1

2. sync.Mutex

2.1 概述

Go 语言的 sync.Mutex 是一种并发原语,旨在保证同一时间只有一个 Goroutine 可以访问共享资源,从而实现互斥(Mutual Exclusion)。它的底层实现是基于两个核心字段和一套复杂的自旋、排队和唤醒逻辑,以在性能和公平性之间取得平衡。

sync.Mutex 类型只有两个公开的指针方法:Lock()Unlock()

  • m.Lock():锁定当前的共享资源
  • m.Unlock():进行解锁

2.2 数据结构

前面我们已经展示过 sync.Mutex 的数据结构了:

1
2
3
4
type Mutex struct {
state int32
sema uint32
}

Go 语言的 sync.Mutex 结构体非常精简,仅包含两个字段:

  • state (int32):这是一个 32 位整数,用于原子地表示互斥锁的当前状态。通过不同的位(Bit)来编码多种信息,实现了极高的效率。
  • sema (uint32):这是我们前面提到的 sema 锁,用于实现 Goroutine 的阻塞和唤醒机制。当 Goroutine 无法立即获取锁时,它会在该信号量上阻塞休眠,等待锁的持有者释放信号量将其唤醒。

如何理解这 2 个字段呢?在我看来:

  • state 字段是在用户态(User Space)解决"谁拿到锁"的逻辑。
  • sema 字段是用来解决"拿不到锁的 Goroutine 到底去了哪里、怎么睡、怎么醒"的物理问题。

2.3 state 字段

sema 前面已经介绍得非常清楚了,下面我们重点来分析一下 state 字段。

为了最大化性能,state 字段通过位运算存储了四个关键信息,这些信息共同决定了锁的运行模式和竞争程度:

位 (Bit) 含义 解释
0 Locked 1 表示已加锁,0 表示未加锁。
1 Woken 1 表示已有 Goroutine 被唤醒(正在尝试获取锁),此时不需要再唤醒其他人。
2 Starvation 1 表示进入饥饿模式(Go 1.9+ 引入的关键优化)。
3-31 WaiterCount 记录当前有多少个 Goroutine 在排队等待。

如下图所示:

1
2
3
4
5
6
7
 31                           3  2  1  0
┌─────────────────────────────┬──┬──┬──┐
│ 等待者数量 29 bits │S │W │L │
└─────────────────────────────┴──┴──┴──┘
│ │ └─ mutexLocked (锁定状态)
│ └──── mutexWoken (唤醒标志)
└─────── mutexStarving (饥饿模式)

使用一个 int32 来存储这么多信息有三大好处:

  1. 满足多个状态修改的原子性:所有状态必须在一个原子操作中一起更新,避免状态不一致。

    1
    2
    3
    4
    5
    6
    // 错误的设计(如果分开存储)
    mutex.locked = true // ← 这里可能被中断
    mutex.waiterCount++ // ← 状态不一致的窗口期

    // 正确的设计(单个原子操作)
    atomic.CompareAndSwapInt32(&m.state, old, new) // 一次性更新所有状态
  2. CPU Cache Line 效率:一个 int32 只占 4 字节,极度缓存友好,所有状态信息在同一个 cache line 中,读取/修改只需要一次内存访问,避免 false sharing。

  3. Fast Path 快速路径优化:在无竞争情况下,即 state == 0 表示完全空闲(无锁、无等待、无标志),一次 CAS 就能完成加锁,编译器可以内联这段代码,这是 99% 无竞争场景的关键优化。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
    if race.Enabled {
    race.Acquire(unsafe.Pointer(m))
    }
    return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()

state 的状态转换示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 初始状态
state = 0b00000000_00000000_00000000_00000000

// goroutine A 获取锁
state = 0b00000000_00000000_00000000_00000001 // L=1

// goroutine B 尝试获取,进入等待队列
state = 0b00000000_00000000_00000000_00001001 // L=1, waiter=1

// goroutine B 设置了 woken 标志(自旋中)
state = 0b00000000_00000000_00000000_00001011 // L=1, W=1, waiter=1

// 等待超过 1ms,进入饥饿模式
state = 0b00000000_00000000_00000000_00001101 // L=1, S=1, waiter=1

2.4 上锁

  • 正常模式:获得锁直接返回,得不到锁就自旋,自旋多次后进入 sema 队列中休眠,超过 1ms 就转为饥饿模式;
  • 饥饿模式:
    • 新来的协程不自旋,直接今年入 sema 队列中;
    • 依次从 sema 队列中唤醒协程,并直接获得锁,当 sema 队列为空时,跳回正常模式

上锁的源码位于 sync/mutex.go#L61,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
func (m *Mutex) lockSlow() {
// ===== 初始化局部变量 =====
var waitStartTime int64 // 开始等待的时间戳(用于判断是否饥饿)
starving := false // 当前 goroutine 是否处于饥饿状态
awoke := false // 当前 goroutine 是否从休眠中被唤醒
iter := 0 // 自旋迭代次数
old := m.state // 保存当前 mutex 的状态

// ===== 主循环:不断尝试获取锁 =====
for {
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 第一阶段:自旋尝试(Active Spinning)
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 自旋条件:
// 1. old&mutexLocked != 0:锁已被持有
// 2. old&mutexStarving == 0:不在饥饿模式(饥饿模式下新来的不能竞争)
// 3. runtime_canSpin(iter):满足自旋条件(多核、迭代次数 < 4、有其他 P 等)
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 尝试设置 mutexWoken 标志,条件:
// 1. !awoke:我们还没设置过
// 2. old&mutexWoken == 0:当前没有其他 goroutine 设置
// 3. old>>mutexWaiterShift != 0:有等待者(不然设置 woken 没意义)
//
// 目的:告诉 Unlock "有人在自旋,不要唤醒休眠的 goroutine",减少不必要的唤醒开销
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}

// 执行实际的自旋(CPU 级别的忙等待)
runtime_doSpin()
iter++
old = m.state // 重新读取状态
continue // 继续下一轮尝试
}

// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 第二阶段:准备新的状态值(CAS 更新)
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

new := old // 基于旧状态构造新状态

// 如果不在饥饿模式,尝试设置 mutexLocked 位
// (在饥饿模式下,新来的 goroutine 不能直接抢锁,必须排队)
if old&mutexStarving == 0 {
new |= mutexLocked
}

// 如果锁已被持有或处于饥饿模式,增加等待者计数(即将进入等待队列)
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift // 等待者数量 +1
}

// 如果当前 goroutine 已经饥饿(等待超过 1ms),并且锁还被持有
// 则尝试将 mutex 切换到饥饿模式
// 注意:只在锁被持有时切换,因为 Unlock 期望饥饿模式必有等待者
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}

// 如果当前 goroutine 是被唤醒的,需要清除 mutexWoken 标志
if awoke {
// 清除 mutexWoken 标志(用 &^ 位清除操作)
new &^= mutexWoken
}

// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 第三阶段:CAS 更新状态
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

// 尝试用 CAS 将状态从 old 更新为 new
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// CAS 成功!

// 检查是否成功获取了锁
// 条件:旧状态既没锁定也不在饥饿模式
if old&(mutexLocked|mutexStarving) == 0 {
break // 成功获取锁,退出循环!
}

// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 第四阶段:进入等待队列并休眠
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

// 如果之前已经等待过(被唤醒后重新竞争失败),插入队首(LIFO)
// 否则插入队尾(FIFO)
queueLifo := waitStartTime != 0

// 记录开始等待的时间(只记录一次)
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}

// 调用 semacuqire 进入休眠 <--- 阻塞在这里,直到被唤醒
runtime_SemacquireMutex(&m.sema, queueLifo, 2)

// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 被唤醒了!从这里继续执行
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

// 检查是否应该进入饥饿模式
// 条件:之前已经饥饿 || 等待时间超过 1ms
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

// 重新读取当前状态
old = m.state

// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 饥饿模式的特殊处理
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

if old&mutexStarving != 0 {
// 饥饿模式下被唤醒,说明锁被直接移交给我们了
// 但此时状态还不一致:
// - mutexLocked 还没设置(需要我们设置)
// - 我们还被计入等待者(需要减 1)


// 计算状态变化:
// +mutexLocked:设置锁定标志
// -1<<mutexWaiterShift:等待者数量减 1
delta := int32(mutexLocked - 1<<mutexWaiterShift)

// 决定是否退出饥饿模式
// 条件:
// 1. 当前 goroutine 不再饥饿(等待时间 < 1ms)
// 2. 或者我们是最后一个等待者
if !starving || old>>mutexWaiterShift == 1 {
// 退出饥饿模式(清除 mutexStarving 标志)
// 注意:必须在这里退出,考虑实际等待时间
// 饥饿模式效率低,如果不及时退出,两个 goroutine
// 可能会无限期地在饥饿模式下来回切换
delta -= mutexStarving
}

// 原子更新状态
atomic.AddInt32(&m.state, delta)
break // 成功获取锁,退出循环!
}

// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 正常模式被唤醒:重新开始竞争
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

awoke = true // 标记为已唤醒
iter = 0 // 重置自旋计数器(可以重新自旋)

} else {
// CAS 失败:状态被其他 goroutine 改变了
// 重新读取状态,继续下一轮循环
old = m.state
}
}
// ===== 成功获取锁,退出循环 =====
}

关键步骤:

  1. 自旋(Spinning):在正常模式且满足条件时自旋等待
  2. 设置 mutexWoken:告诉 Unlock 不要唤醒其他 goroutine
  3. 更新等待者计数:增加 waiter 数量
  4. 进入信号量等待:调用 runtime_SemacquireMutex
  5. 饥饿模式切换:等待时间超过 1ms 切换到饥饿模式

自旋条件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
const (
active_spin = 4 // referenced in proc.go for sync.Mutex implementation
active_spin_cnt = 30 // referenced in proc.go for sync.Mutex implementation
)

func internal_sync_runtime_canSpin(i int) bool {
// 必须同时满足:
// - 自旋次数 < 4
// - 多核 CPU(numCPU > 1)
// - 有其他运行的 P
// - 本地运行队列为空
if i >= active_spin || numCPUStartup <= 1 || gomaxprocs <= sched.npidle.Load()+sched.nmspinning.Load()+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}

2.5 解锁

  • 正常模式:解锁后新来的协程和 sema 队列中的协程一起竞争;
  • 饥饿模式:新来的协程直接入 sema 队列,依次从 sema 队列中唤醒协程并直接交付锁;

上锁的源码位于 sync/mutex.go#L202,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
func (m *Mutex) Unlock() {
// ===== Fast Path:快速路径(无竞争情况)=====
// 原子地将 state 减去 mutexLocked(即清除锁定标志位)
// 如果 mutex 完全空闲(无等待者、无其他标志),new 将等于 0
// 如果 new == 0,说明没有等待者,直接返回(最快路径)
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// new != 0 说明还有其他信息(等待者、标志位等)
// 需要进入慢速路径处理
m.unlockSlow(new)
}
}

// unlockSlow 是 Unlock 的慢速路径,处理有等待者或特殊标志的情况
// 参数 new:已经减去 mutexLocked 后的新状态值
func (m *Mutex) unlockSlow(new int32) {
// 不允许对未加锁的 mutex 进行 unlock!
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}

// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 分支 1:正常模式(Normal Mode)
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
if new&mutexStarving == 0 {
// 正常模式下的唤醒逻辑:不保证被唤醒者一定能获取锁
old := new

for {
// ===== 检查是否需要唤醒等待者 =====
// 以下任一条件满足,都无需唤醒:
// 1. old>>mutexWaiterShift == 0
// → 没有等待者
//
// 2. old&mutexLocked != 0
// → 锁已经被其他 goroutine 抢走了
// (在我们 Unlock 之后,有新来的 goroutine 直接 CAS 获取了锁)
//
// 3. old&mutexWoken != 0
// → 已经有一个 goroutine 被标记为唤醒状态
// (可能在自旋,或者已经被其他 Unlock 唤醒)
//
// 4. old&mutexStarving != 0
// → 进入饥饿模式了
// (虽然我们检查的是 new&mutexStarving == 0 才进这个分支,
// 但在循环中 old 可能被其他 goroutine 更新了)
// 饥饿模式有专门的处理逻辑,我们不应该干预
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return // 不需要唤醒,直接返回
}

// ===== 准备唤醒一个等待者 =====

// 构造新状态:
// 1. old - 1<<mutexWaiterShift:等待者数量减 1
// 2. | mutexWoken:设置 mutexWoken 标志
//
// mutexWoken 的作用:
// - 告诉正在 Lock 的 goroutine:"已经有人被唤醒了"
// - 避免多个 Unlock 重复唤醒
// - 被唤醒的 goroutine 会清除这个标志
new = (old - 1<<mutexWaiterShift) | mutexWoken

// 用 CAS 更新状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// CAS 成功!我们获得了唤醒的权利

// 调用 runtime 的信号量释放操作,唤醒一个等待者
// 参数:
// - &m.sema:信号量地址
// - false:handoff=false,正常模式,不直接移交
// 被唤醒的 goroutine 需要重新竞争锁
// - 2:skipframes(用于性能分析)
runtime_Semrelease(&m.sema, false, 2)
return
}

// CAS 失败:状态被其他 goroutine 改变了
// 重新读取状态,继续下一轮循环
old = m.state
}

// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 分支 2:饥饿模式(Starvation Mode)
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
} else {
// 饥饿模式的特殊处理:
//
// 1. 直接移交所有权(handoff=true)
// - 被唤醒的 goroutine 保证能获取锁
// - 不需要重新竞争
//
// 2. mutexLocked 位不设置
// - 被唤醒的 goroutine 会自己设置
// - 见 lockSlow 中 old&mutexStarving != 0 分支
//
// 3. mutexStarving 标志保持
// - 新来的 goroutine 看到这个标志,知道不能竞争
// - 必须排队等待
//
// 4. 当前 goroutine 会主动让出 CPU(在 semrelease1 中 goyield)
// - 让被唤醒者立即运行
// - 避免延迟

// 调用 semarelease() 释放操作
// 参数:
// - &m.sema:信号量地址
// - true:handoff=true,饥饿模式,直接移交
// semrelease1 会设置 ticket=1,并调用 goyield()
// - 2:skipframes(用于性能分析)
runtime_Semrelease(&m.sema, true, 2)
}
}

关键步骤:

  1. 原子清除锁定位:atomic.AddInt32(&state, -mutexLocked),结果为 0 则直接返回
  2. 检查是否需要唤醒:无等待者/已有锁持有者/已有被唤醒者则跳过
  3. 正常模式:设置 mutexWoken 标志 + 减少等待者计数 + semrelease(handoff=false) 唤醒但需重新竞争
  4. 饥饿模式:semrelease(handoff=true) 直接移交所有权 + goyield() 让出 CPU

2.6 总结

3. sync.RWMutex

3.1 概述

  • 同时只能有一个 Goroutine 能够获得写锁
  • 同时可以有任意多个 Gorouinte 获得读锁
  • 同时只能存在写锁或读锁(读和写互斥)

sync.RWMutex 提供了 4 个方法:

  • rwm.RLock():上读锁
  • rwm.RUnlock():解读锁
  • rwm.Lock():上写锁
  • rwm.Unlock():解读锁

3.2 数据结构

sync.RWMutex 定义在 sync/rwmutex.go#L39,如下所示:

1
2
3
4
5
6
7
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount atomic.Int32 // number of pending readers
readerWait atomic.Int32 // number of departing readers
}
  • w:写锁,拿到它直接有了上写锁的资格,有可能还需要等待读锁全部释放
  • writerSem:写协程等待队列
  • readerSem:读协程等待队列
  • readerCount:正值表示正值读的协程个数,负值表示加了写锁;
  • readerWait:上写锁应该等待读协程的个数

3.3 上写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const rwmutexMaxReaders = 1 << 30 // 最多的读者个数,是一个非常大的值

func (rw *RWMutex) Lock() {
// 1. 抢占获取写锁的资格
rw.w.Lock()
// Announce to readers there is a pending writer.
// 2. 原子变量 readerCount 减去 rwmutexMaxReaders 表明当前有写的需求,
// 阻止后续读锁的抢占,写者优先!
// 再加回去是要恢复原来的值,以得到抢锁之前正常读的协程的个数 r
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
// 3. 陷入 writerSem,等待 readerWait 个正在读的协程释放读锁
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
// 4. 抢锁成功
}

3.4 解写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (rw *RWMutex) Unlock() {
// 1. 把 rwmutexMaxReaders 加回去,表示已经没有写协程了
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
// 不允许对未上锁的锁进行 Unlock!
fatal("sync: Unlock of unlocked RWMutex")
}
// 2. 唤醒所有阻塞在 readerSem 中的读协程
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 3. 允许其他协程抢占写锁
rw.w.Unlock()
}

3.5 上读锁

1
2
3
4
5
6
7
8
func (rw *RWMutex) RLock() {
// 1. readerCount++,检查是否有写锁
if rw.readerCount.Add(1) < 0 {
// 2. 有写锁,则陷入 readerSem,等待写锁释放
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
// 3. 没有写锁或者写锁释放后唤醒 readerSem,则获得读锁成功
}

3.6 解读锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (rw *RWMutex) RUnlock() {
// 1. 释放当前读锁,将 readerCount --
// 2. 检查是否有写协程正在等待
if r := rw.readerCount.Add(-1); r < 0 {
// 3. 如果有写协程等待,则往下走
rw.rUnlockSlow(r)
}
}

func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
fatal("sync: RUnlock of unlocked RWMutex")
}
// 4. readerWait--
// 5. 判断是否是最后一个释放读锁的协程
if rw.readerWait.Add(-1) == 0 {
// 6. 是的话,就从 writerSem 中唤醒写协程
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

3.7 总结

总的来说,Go 的 RWMutex 遵循的是写者优先(Writer Priority) 原则,防止写者饥饿。四个核心方法的要点总结如下:

  • 上写锁:竞争写锁,看看有无读协程:

    • 没有读协程的话直接获得写锁;

    • 有读协程的话,阻塞后来的读协程,等待当前读协程释放;

  • 解写锁:解写锁,唤醒 readerSem;

  • 上读锁:readerCount++,并检查是否有写锁:

    • 没有写锁,则上锁完毕;

    • 有写锁,则陷入 readerSem,等待写锁释放;

  • 解读锁:readerCount --,并检测是否有写协程被阻塞:

    • 无,则返回;

    • 有,则 readerWait --;判断是否是最后一个释放读锁的协程:

      • 不是,则返回;
      • 是,则唤醒 writerSem,解锁完毕;

4. sync.WaitGroup

4.1 概述

WaitGroup 等待一组 Goroutine 完成。主 Goroutine 调用 Add 来设置要等待的 Goroutine 的数量。然后每个 Goroutine 运行并在完成时调用 Done。同时,主 Goroutine 可以使用 Wait 来阻塞,直到所有 Goroutine 完成。

  • wg.Add(delta int):Add 将 delta(可能为负)添加到 WaitGroup 计数器。如果计数器变为 0,所有在 Wait 时阻塞的 Goroutine 将被释放。如果计数器变成负值,Add 会 panic。
  • wg.Done():当 WaitGroup 同步等待组中的某个 Goroutine 执行完毕后,设置这个 WaitGroup 的 counter 数值减 1。
  • wg.Wait():表示让当前的 Goroutine 等待,进入阻塞状态。一直到 WaitGroup 的计数器为 0,才能解除阻塞,这个 Goroutine 才能继续执行。

4.2 数据结构

sync.WaitGroup 源码位于 sync/waitgroup.go#L48

1
2
3
4
5
6
7
8
9
10
type WaitGroup struct {
noCopy noCopy // 防止拷贝

// Bits (high to low):
// bits[0:32] counter
// bits[32] flag: synctest bubble membership
// bits[33:64] wait count
state atomic.Uint64 // 核心状态字段(64位)
sema uint32 // sema 锁地址
}

重点是看 state 字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 63                    33  32  31                     0
┌─────────────────────┬───┬───┬─────────────────────┐
│ waiter count │ B │ 0 │ counter │
│ (31 bits) │ u │ │ (32 bits) │
│ │ b │ │ │
│ │ b │ │ │
│ │ l │ │ │
│ │ e │ │ │
└─────────────────────┴───┴───┴─────────────────────┘
bits[33:64] bit32 bits[0:32]

counter: 当前待完成的任务数(Add 增加,Done 减少)
bubble flag: synctest 相关(测试用)
waiter count: 有多少个 goroutine 在 Wait 中阻塞

为什么要用一个字段?

  1. 原子操作:可以用一次原子操作同时读写两个值
  2. 避免竞态:counter 和 waiter 总是一致的快照
  3. 零分配:整个 WaitGroup 只需 16 字节(8+4+padding)
1
2
3
state := wg.state.Load()
counter := int32(state >> 32) // 取高32位
waiter := uint32(state & 0x7fffffff) // 取低31位(忽略bubble flag)

4.3 wg.Wait()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (wg *WaitGroup) Wait() {
for {
state := wg.state.Load()
v := int32(state >> 32) // counter

// 1. Fast Path: counter == 0,直接返回
if v == 0 {
return
}

// 2. Slow Path: counter > 0,需要等待
// 用 CAS 增加 waiter 计数
if wg.state.CompareAndSwap(state, state+1) {
// CAS 成功,进入等待
runtime_SemacquireWaitGroup(&wg.sema, false)
// 被唤醒后返回
return
}
// CAS 失败,重试
}
}

4.4 wg.Add()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (wg *WaitGroup) Add(delta int) {
// 1. 原子地将 delta 加到 counter(高32位)
state := wg.state.Add(uint64(delta) << 32)

// 2. 解析 state
v := int32(state >> 32) // counter
w := uint32(state & 0x7fffffff) // waiter count

// 3. 错误检查
if v < 0 {
panic("negative counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("Add called concurrently with Wait")
}

// 4. 快速返回:counter > 0 或没有 waiter
if v > 0 || w == 0 {
return
}

// 5. 关键时刻:counter 降到 0,且有 waiter 在等待
// → 唤醒所有 waiter!
wg.state.Store(0) // 重置状态
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0) // 唤醒一个
}
}

4.5 wg.Done()

1
2
3
4
func (wg *WaitGroup) Done() {
// 就是执行 counnter--
wg.Add(-1)
}

5. sync.Once

5.1 概述

sync.Once 可以让并发中的一段代码只执行一次;

  • once.Do(func):执行某一函数,该函数在多个协程中,只会被执行一次。

5.2 数据结构

sync.Once 的源码位于 sync/once.go#L20

1
2
3
4
5
6
type Once struct {
_ noCopy

done atomic.Bool
m Mutex
}
  • done:表示当前 once 是否已经执行过了;
  • m:锁

5.3 once.Do()

其实就一个简单的双重检测逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (o *Once) Do(f func()) {
// 如果已经执行过的了,直接返回
if !o.done.Load() {
o.doSlow(f)
}
}

func (o *Once) doSlow(f func()) {
// 上锁
o.m.Lock()
defer o.m.Unlock()
// 上锁后二次检查
if !o.done.Load() {
defer o.done.Store(true)
f()
}
}

6. sync.Cond

6.1 概述

从第一性原理来看,sync.Cond 解决的是轮询(Polling) vs 事件通知(Event Notification)的问题。 当你需要等待某个特定条件(比如"队列不为空"或"缓冲区有空位")满足时,你只有两种选择:

  1. 轮询 (Spinning):在一个死循环里不断加锁检查。
  2. 通知 (Cond):我去睡觉,等条件满足了,你把我叫醒。

Go 的 sync.Cond 实现非常独特,它没有直接使用操作系统层面的 Condition Variable(如 Pthread Cond),而是自己在 Runtime 层面实现了一套基于票号(Ticket)的通知队列

sync.Cond 提供了 3 个核心方法:

  • c.Wait():阻塞,等待条件发生
  • c.Signal():唤醒一个等待的协程
  • c.Broadcast():唤醒所有等待的协程

使用方式:

1
2
3
4
5
6
c.L.Lock()          // 1. 先加锁(保护条件 condition)
for !condition() { // 2. 必须用 for 循环检查(防止虚假唤醒)
c.Wait() // 3. 挂起(内部会:解锁 -> 睡 -> 加锁)
}
// 执行业务逻辑...
c.L.Unlock() // 4. 最终解锁

6.2 数据结构

sync.Cond 源码位于 sync/cond.go#L37

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Cond struct {
noCopy noCopy // 防止拷贝
L Locker // 关联的锁(通常是 *Mutex 或 *RWMutex)
notify notifyList // 等待队列(ticket-based)
checker copyChecker // 运行时拷贝检测
}

type notifyList struct {
wait atomic.Uint32 // 下一个等待者的票号(原子递增)
notify uint32 // 下一个要通知的票号

// 等待者列表
lock mutex
head *sudog
tail *sudog
}

理解 sync.Cond 的关键,在于理解它如何解决虚假唤醒消息丢失的问题。Go 使用了一种类似银行排号系统的逻辑。

1
2
3
4
wait = 5, notify = 2

当前排队: ticket 2, 3, 4 (还未通知)
即将排队: ticket 5, 6, 7... (新来的)

6.3 c.Wait()

当一个 Goroutine 调用 Wait() 时,发生了以下严密的步骤:

  1. 拿号 (Ticket Allocation): 调用 runtime_notifyListAdd。这本质上是一个原子操作,将 notifyList 中的 wait 计数器加 1,并返回当前的序列号(Ticket)。
  2. 解锁 (Unlock): 调用 c.L.Unlock()。必须先拿号,再解锁。这保证了即使你在解锁后、睡觉前,有人发送了信号,你的号也已经排进去了,不会错过通知。
  3. 睡觉 (Block): 调用 runtime_notifyListWait(Ticket),把自己挂起,等待有人喊"第 100 号"或者"所有人"醒来。
  4. 重新加锁 (Lock): 当被唤醒后,Wait 函数返回前,会自动调用 c.L.Lock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (c *Cond) Wait() {
// 步骤 1: 拿号 (Ticket Allocation)
// 关键点:在解锁之前先拿号!
// 这保证了即使我还没睡着,Signal 发送者也能知道"有一个持有 t 号的人正在赶来的路上"。
t := runtime_notifyListAdd(&c.notify)

// 步骤 2: 解锁 (Unlock User Lock)
// 必须解锁,否则 Signal 的发送者无法获得锁来修改条件,死锁。
c.L.Unlock()

// 步骤 3: 入队并休眠 (Enqueue & Park)
runtime_notifyListWait(&c.notify, t)

// 步骤 4: 重新加锁 (Relock)
// 醒来后,必须恢复到调用 Wait 前的状态,以便重新检查 for !condition()。
c.L.Lock()
}

func notifyListAdd(l *notifyList) uint32 {
// This may be called concurrently, for example, when called from
// sync.Cond.Wait while holding a RWMutex in read mode.
return l.wait.Add(1) - 1
}

func notifyListWait(l *notifyList, t uint32) {
lockWithRank(&l.lock, lockRankNotifyList)

// 进入 notifyListWait 后,再次检查一下 l.notify),
// 如果 l.notify > t,说明已经被叫过了,
// 那我就不睡了,直接返回。这完美解决了信号丢失问题。
if less(t, l.notify) {
unlock(&l.lock)
return
}

// 入队休眠
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)
if t0 != 0 {
blockevent(s.releasetime-t0, 2)
}
releaseSudog(s)
}

6.4 c.Signal()

当调用 Signal() 时:

  1. 调用 runtime_notifyListNotifyOne
  2. 它会查找 notifyList最早那个还没被唤醒的 Ticket(比如第 99 号已醒,现在叫第 100 号)。
  3. 通过 sema(信号量)精确唤醒持有该 Ticket 的那个 Goroutine。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (c *Cond) Signal() {
runtime_notifyListNotifyOne(&c.notify)
}

func notifyListNotifyOne(l *notifyList) {
// 如果 wait == notify,说明没有新的等待者,直接返回
if l.wait.Load() == atomic.Load(&l.notify) {
return
}

// 获取锁,因为需要修改 notifylist
lockWithRank(&l.lock, lockRankNotifyList)

// 双重检查,如果没有新的等待者,则直接返回
t := l.notify
if t == l.wait.Load() {
unlock(&l.lock)
return
}

// 更新下一个 notify 的票号
atomic.Store(&l.notify, t+1)

// 从 notifyList 尝试唤醒一个休眠中的 G
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
if s.g.bubble != nil && getg().bubble != s.g.bubble {
println("semaphore wake of synctest goroutine", s.g.goid, "from outside bubble")
fatal("semaphore wake of synctest goroutine from outside bubble")
}
// 唤醒
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}

func readyWithTime(s *sudog, traceskip int) {
if s.releasetime != 0 {
s.releasetime = cputicks()
}
goready(s.g, traceskip)
}

6.5 c.Broadcast()

当调用 Broadcast() 时:

  1. 调用 runtime_notifyListNotifyAll
  2. 它不需一个一个叫,而是直接记下当前的 wait 计数器值(比如当前排到了 150 号)。
  3. 它会唤醒从"当前已唤醒号"到"150 号"之间的所有 Goroutine。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (c *Cond) Broadcast() {
runtime_notifyListNotifyAll(&c.notify)
}

func notifyListNotifyAll(l *notifyList) {
// 没有新的等待者,直接返回
if l.wait.Load() == atomic.Load(&l.notify) {
return
}

// 上锁
lockWithRank(&l.lock, lockRankNotifyList)
// 清空 notifyList,因为全部都会被唤醒
s := l.head
l.head = nil
l.tail = nil

// 更新 notify 为当前的 wait
atomic.Store(&l.notify, l.wait.Load())
unlock(&l.lock)

// 唤醒旧的 notifyList 的所有 sudog
for s != nil {
next := s.next
s.next = nil
if s.g.bubble != nil && getg().bubble != s.g.bubble {
println("semaphore wake of synctest goroutine", s.g.goid, "from outside bubble")
fatal("semaphore wake of synctest goroutine from outside bubble")
}
// 唤醒
readyWithTime(s, 4)
s = next
}
}

6.6 总结

graph TB
    A[sync.Cond 核心机制]

    A --> B[Ticket 系统
wait & notify] A --> C[三步原子操作
Add→Unlock→Wait] A --> D[按序唤醒
FIFO] B --> E[防止丢失唤醒] C --> F[保证 happens-before] D --> G[公平性] style A fill:#ffcccc style B fill:#e1f5ff style C fill:#fff4e1 style D fill:#ccffcc

sync.Cond 的核心设计:

  • Ticket 系统:基于票号的通知机制,防止丢失唤醒
  • 三步原子操作:Add→Unlock→Wait,顺序不能错
  • 必须循环 Wait:防止虚假唤醒和竞态条件
  • 关联 Locker:Wait 自动释放和重新获取锁

7. 排查锁异常问题

7.1 锁拷贝 go vet

1
2
3
4
5
m := sync.Mutex{}
m.Lock()
n := m // n 拷贝 m
m.Unlock()
n.Lock() // 这里会报错,因为 n 在拷贝 m 的时候,把它已经 lock 的状态也拷贝了

这个时候,可以用 Go 提供的 go vet 工具来检查是否存在锁拷贝问题:

1
2
3
➜ go vet main.go
# command-line-arguments
./main.go:16:7: assignment copies lock value to n: sync.Mutex

go vet 还能检测可能的 bug 和可疑的构造。

7.2 数据竞争问题 - go build -race

1
2
3
4
5
6
7
8
9
10
11
12
// 此处 i 有并发问题
func add(i *int32) {
*i++
}

func main() {
c := int32(0)
for i := 0; i < 100; i++ {
go add(&c)
}
time.Sleep(time.Second)
}

这个时候,可以用 Go 提供的 go build -race 工具来检查是否存在数据竞争问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
➜  go build -race main.go
➜ ./main
==================
WARNING: DATA RACE
Read at 0x00c000124000 by goroutine 7:
main.add()
/Users/hedon-/goProjects/leetcode/go_advance/13-mutex/atomic/main.go:6 +0x3a

Previous write at 0x00c000124000 by goroutine 6:
main.add()
/Users/hedon-/goProjects/leetcode/go_advance/13-mutex/atomic/main.go:6 +0x4e

Goroutine 7 (running) created at:
main.main()
/Users/hedon-/goProjects/leetcode/go_advance/13-mutex/atomic/main.go:12 +0x84

Goroutine 6 (finished) created at:
main.main()
/Users/hedon-/goProjects/leetcode/go_advance/13-mutex/atomic/main.go:12 +0x84
==================
Found 1 data race(s)

7.3 死锁 go-deadlock

  • https://github.com/sasha-s/go-deadlock

8. 再次看 Go 锁的两大基础

在分析完 Go 的各种并发工具之后,相信不少读者都能理解为什么 atomic 和 sema 是 Go 锁的两大基础了。

graph TB
    subgraph "用户层并发工具"
        Mutex[sync.Mutex]
        RWMutex[sync.RWMutex]
        WaitGroup[sync.WaitGroup]
        Cond[sync.Cond]
        Once[sync.Once]
        Pool[sync.Pool]
        Chan[Channel]
    end

    subgraph "Runtime 基础原语"
        Atomic[Atomic 原子操作]
        Sema[Semaphore
sleep/wakeup] end Mutex --> Atomic Mutex --> Sema RWMutex --> Atomic RWMutex --> Sema WaitGroup --> Atomic WaitGroup --> Sema Cond --> Sema Once --> Atomic Pool --> Atomic Chan --> Atomic Chan --> Sema style Atomic fill:#ffcccc style Sema fill:#e1f5ff

还是前面那句话:

[!IMPORTANT]

atomic 提供无锁的快速状态管理(CAS、加减),sema 提供有竞争时的 goroutine 休眠/唤醒机制,两者组合实现"乐观尝试 + 悲观等待"的高效并发模型。

graph LR
    subgraph "性能层级"
        A[atomic
纳秒级
99% 场景] B[sema
微秒级
1% 竞争] end A -->|无竞争| Fast[Fast Path] A -->|低竞争
自旋| Spin[Spin] B -->|高竞争| Slow[Slow Path
休眠/唤醒] style A fill:#ccffcc style B fill:#e1f5ff style Fast fill:#90EE90 style Slow fill:#FFB6C1

这里笔者再次梳理下各个并发工具的如何运用 atomic 和 sema 的:

  • sync.Mutex

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    type Mutex struct {
    state int32 // ← Atomic 操作的目标
    sema uint32 // ← Semaphore 使用的地址
    }

    // Lock 流程:
    // 1. atomic.CAS(state, 0, 1) ← Atomic 快速路径
    // 2. 失败 → 自旋 + atomic 操作 ← Atomic 重试
    // 3. 还失败 → semacquire(&sema) ← Semaphore 休眠

    // Unlock 流程:
    // 1. atomic.Add(state, -1) ← Atomic 快速路径
    // 2. 有等待者 → semrelease(&sema) ← Semaphore 唤醒
  • sync.RWMutex

    1
    2
    Atomic: 管理 state(锁定/唤醒/饥饿/等待者)
    Sema: 竞争时休眠/唤醒
  • sync.WaitGroup

    1
    2
    Atomic: 管理 reader 计数和 writer 等待标志
    Sema: writer 等待、reader 等待(两个独立的 sema)
  • sync.Once

    1
    2
    Atomic: 管理缓冲区索引、状态标志
    Sema: 发送/接收阻塞时休眠/唤醒
  • sync.Cond

    1
    2
    Atomic: 管理计数器(Add/Done)
    Sema: Wait() 时如果计数 > 0 则休眠
  • Channel

    1
    2
    Atomic: (底层 Mutex 用)
    Sema: Wait() 休眠,Signal/Broadcast 唤醒