系列文章:
- Rust 原理丨聊一聊 Rust 的 Atomic 和内存顺序
- Rust 原理丨从汇编角度看原子操作
- Rust 实战丨手写一个 SpinLock
- Rust 实战丨手写一个 oneshot channel
- Rust 实战丨手写一个 Arc
- Rust 原理丨操作系统并发原语
- Rust 实战丨手写一个 Mutex 👈 本篇
- Rust 实战丨手写一个 Condvar
- Rust 实战丨手写一个 RwLock
继上篇 Rust
原理丨操作系统并发原语,我们学习了不同操作系统下的并发原语实现,理解了它们最重要的贡献就是提供了一套
wait/wake_one/wake_all
的机制。本篇,我们将借助 Rust Atomics and Locks 的作者 Mara Bos 封装的 atomic-wait
crate,来手写一个自己的 Mutex
!
v1:基本实现
首先我们来思考一下如何定义数据结构:
- 我们需要 1 个原子变量 state 来记录锁的状态(0:
unlocked, 1: locked),因为
atomic-wait
只支持 AtomicU32,所以这里我们的类型也定义为 AtomicU32。 - 另外我们需要一个 value
字段来保存数据,当抢到锁的时候,是可以对 value
进行修改的,但是这个时候只有共享引用,所以我们需要
UnsafeCell
来提供内部可变性。
同时,贯彻 RAII(Resource Acquisition Is Initialization,资源获取即初始化)原则:
- 我们在
lock(&self)
成功时返回一个MutexGuard
,它包含&Mutex
。 - 在
MutexGuard
drop 的时候,我们将 state 重置为 0,表示释放锁,并唤醒一个潜在的阻塞线程。
为了让 Mutex
可以在线程之间共享,我们需要为其实现
Sync
trait,而又因为 Mutex
实现的是独占访问,上锁成功的线程是拥有 T 的所有权的,即要求 T
可以在线程中转移,即要求 T 需要实现 Send
trait。
综上,我们定义的 Mutex
和 MutexGuard
结构如下:
1 | pub struct Mutex<T> { |
为了方面访问内部数据,我们为 MutexGuard
实现
Deref
和 DerefMut
这 2 个 trait:
1 | impl<T> Deref for MutexGuard<'_, T> { |
当 MutexGuard
离开作用域的时候,即被 drop
的时候,我们需要释放锁,并调用 wake_one
去唤醒一个潜在的阻塞线程:
1 | impl<T> Drop for MutexGuard<'_, T> { |
这里将 state
设置为 0,使用的内存顺序是
Release
,是为了跟 lock
的时候使用
Acquire
建立 happens-before 原则,确保 state
的真实值在各个线程中都是可见的。
在 lock
的时候,我们需要将 state
从 0
替换为 1,如果成功,则说明上锁成功,直接返回
MutexGuard,如果失败,则说明锁已经被抢占了,这个时候我们使用
atomic-wait
的 wait()
陷入休眠,等待
wake_one
信号唤醒,再尝试抢锁。
1 | impl<T> Mutex<T> { |
至此,我们第一个版本的 Mutex
就完工了!是不是很简单!我们来写 2
个单元测试验证一下基本逻辑是否正确:
1 |
|
运行成功:
1 | running 2 tests |
v2:减少系统调用
当 MutexGuard
的时候,我们将 state
置为
0,并调用 wake_one
唤醒一个潜在的线程,这个时候如果没有阻塞中的线程的话,那这个系统调用就比较浪费了。
所以在 v2 版本我们尝试来优化这一点。为此,我们需要扩展我们的
state
字段,新增表示是否有阻塞线程的能力。
1 | pub struct Mutex<T> { |
修改了 state
的定义后我们需要修改上锁和解锁的逻辑,在上锁的时候,我们先尝试将
state
从 0
置为
1
,如果成功了,说明抢到了锁,否则,我们将
state
置为 2
,表示有线程被阻塞了。
这里书中的实现是这样的:
1 | impl<T> Mutex<T> { |
lock()
:
- 如果成功将
state
从0
变换为1
,则说明当前线程抢锁成功,直接返回MutexGuard
。 - 如果失败了,就将
state
置为2
,然后调用wait
进入休眠。
unlock()
:
- 将
state
置为0
,如果之前是2
的话,那就说明有线程被阻塞着,这个时候才调用wake_one
唤醒一个阻塞的线程。
这个地方,笔者觉得有一些问题, state.swap(2, Acquire)
这一行代码会无条件将 state
置为
2
,也就是说,当这个线程抢到锁后,它在 unlock()
的时候,无论有没有在阻塞的线程,这个时候 state
都是
2
,所以都会调用 wake_one
。
sequenceDiagram participant A as 线程A (持有锁) participant B as 线程B (尝试获锁) participant State as Mutex State participant System as 系统调用 Note over State: state = 1 (线程A持有锁) B->>State: compare_exchange(0, 1) State-->>B: 失败 (返回 1) Note over B: 进入 while 循环 B->>State: swap(2) Note over State: state: 1 → 2 State-->>B: 返回 1 (≠ 0) B->>System: wait(&state, 2) Note over B: 线程B进入等待状态 Note over A: 线程A释放锁 A->>State: swap(0) Note over State: state: 2 → 0 State-->>A: 返回 2 A->>System: wake_one() Note over System: 正确的唤醒!线程B确实在等待 Note over B: 线程B被唤醒,继续循环 B->>State: swap(2) Note over State: state: 0 → 2 State-->>B: 返回 0,退出循环 Note over B: 获得锁,但state=2 (问题所在) Note over B: 使用锁... B->>State: unlock() - swap(0) Note over State: state: 2 → 0 State-->>B: 返回 2 B->>System: wake_one() Note over System: 不必要的调用!此时没有等待者
我们可以运行上面的测试用例
cross_thread_should_work
,可以看到输出了 2 个
wake_one,但是通过分析,应该只需要调用一次 wake_one
就足够了。
1 | test mutex::tests::cross_thread_should_work ... ok |
笔者的实现如下:
1 | fn lock_contended(state: &AtomicU32) { |
- 我们获取
state.compare_exchange(0,1)
的返回值,如果成功,说明抢到锁,直接返回。 - 如果失败了:
- 原始值是
1
,那我们就尝试将state
从1
交换为2
,然后调用wait
陷入休眠。 - 原始值是
2
,说明已经有别的线程也被阻塞了,这个时候直接调用wait
陷入休眠。
- 原始值是
- 当被
wake_one
唤醒时,重新执行state.compare_exchange(0,1)
抢占锁。
这个新的流程中,我们抢到锁的时候,state
会被正确的设置为
1
而不是 2
,这个时候,在 drop
的时候就不会有不必要的 wake_one
的调用了。
sequenceDiagram participant A as 线程A (持有锁) participant B as 线程B (尝试获锁) participant State as Mutex State participant System as 系统调用 Note over State: state = 1 (线程A持有锁) B->>State: compare_exchange(0, 1) State-->>B: 失败,返回 s=1 Note over B: s == 1,尝试设置等待者标志 B->>State: compare_exchange(1, 2) Note over State: state: 1 → 2 State-->>B: 成功 B->>System: wait(&state, 2) Note over B: 线程B进入等待状态 Note over A: 线程A完成工作,释放锁 A->>State: unlock() - swap(0) Note over State: state: 2 → 0 State-->>A: 返回 2 A->>System: wake_one() Note over System: 正确唤醒线程B Note over B: 线程B被唤醒,重新尝试获取锁 B->>State: compare_exchange(0, 1) Note over State: state: 0 → 1 (关键!) State-->>B: 成功!退出循环 Note over B: 🎯 获得锁,state=1 (正确状态) Note over B: 使用锁进行工作... B->>State: unlock() - swap(0) Note over State: state: 1 → 0 State-->>B: 返回 1 (不是2!) Note over B: ✅ 返回值是1,不调用wake_one Note over System: 🎯 避免了不必要的系统调用
我们重新运行上面的测试用例
cross_thread_should_work
,可以看到只输出了 1 个
wake_one:
1 | test mutex::tests::cross_thread_should_work ... ok |
不过笔者在做 benchmark 后发现书中的实现性能其实更高,在 macbook m2max 机器上,书中的版本要比我的版本快 5~10% 左右,猜测大概率是
swap
的性能要比compare_exchange
高。
v3:短暂自旋进一步避免系统调用
还有一种潜在的优化是,我们可以在抢锁失败且返回 state
为
1
的时候,进行短暂的自旋,如果实际场景中占用锁的时间非常短,那我们就可以再省略一次
wake
的系统调用了。
不过值得注意的是,这种优化未必是正向的,一方面,如果锁占用时间比较长,那前面的自旋就白白浪费了,另一方面,自旋的次数带来的性能消耗,未必就比系统调用要小(不同的平台表现可能很不一样)。
书中给出的经验值是自选 100 次。
优化后的 lock_contended
如下:
1 | fn lock_contended(state: &AtomicU32) { |
感兴趣的读者可以使用 criterion 做一个 benchmark 看看自旋与之前的版本的性能差异有多少。
完整代码可参考:conutils/mutex。
总结
在本篇中,我们从零开始,结合 Rust 原子操作和内存顺序的核心知识,实现一个 Rust 中的 Mutex 锁,逐步揭示 Mutex 背后的等待与唤醒机制,为更好理解标准库中的 Mutex 奠定了良好的基础。下篇,我们将尝试手写一个条件变量 Condition Variable!
Happy Coding! Peace~