系列文章:
- Rust 原理丨聊一聊 Rust 的 Atomic 和内存顺序
- Rust 原理丨从汇编角度看原子操作
- Rust 实战丨手写一个 SpinLock
- Rust 实战丨手写一个 oneshot channel
- Rust 实战丨手写一个 Arc
- Rust 原理丨操作系统并发原语
- Rust 实战丨手写一个 Mutex 👈 本篇
- Rust 实战丨手写一个 Condvar
- Rust 实战丨手写一个 RwLock
继上篇 Rust
原理丨操作系统并发原语,我们学习了不同操作系统下的并发原语实现,理解了它们最重要的贡献就是提供了一套
wait/wake_one/wake_all 的机制。本篇,我们将借助 Rust Atomics and Locks 的作者 Mara Bos 封装的 atomic-wait
crate,来手写一个自己的 Mutex!
v1:基本实现
首先我们来思考一下如何定义数据结构:
- 我们需要 1 个原子变量 state 来记录锁的状态(0:
unlocked, 1: locked),因为
atomic-wait只支持 AtomicU32,所以这里我们的类型也定义为 AtomicU32。 - 另外我们需要一个 value
字段来保存数据,当抢到锁的时候,是可以对 value
进行修改的,但是这个时候只有共享引用,所以我们需要
UnsafeCell来提供内部可变性。
同时,贯彻 RAII(Resource Acquisition Is Initialization,资源获取即初始化)原则:
- 我们在
lock(&self)成功时返回一个MutexGuard,它包含&Mutex。 - 在
MutexGuarddrop 的时候,我们将 state 重置为 0,表示释放锁,并唤醒一个潜在的阻塞线程。
为了让 Mutex 可以在线程之间共享,我们需要为其实现
Sync trait,而又因为 Mutex
实现的是独占访问,上锁成功的线程是拥有 T 的所有权的,即要求 T
可以在线程中转移,即要求 T 需要实现 Send trait。
综上,我们定义的 Mutex 和 MutexGuard
结构如下:
1 | pub struct Mutex<T> { |
为了方面访问内部数据,我们为 MutexGuard 实现
Deref 和 DerefMut 这 2 个 trait:
1 | impl<T> Deref for MutexGuard<'_, T> { |
当 MutexGuard 离开作用域的时候,即被 drop
的时候,我们需要释放锁,并调用 wake_one
去唤醒一个潜在的阻塞线程:
1 | impl<T> Drop for MutexGuard<'_, T> { |
这里将 state 设置为 0,使用的内存顺序是
Release,是为了跟 lock 的时候使用
Acquire 建立 happens-before 原则,确保 state
的真实值在各个线程中都是可见的。
在 lock 的时候,我们需要将 state 从 0
替换为 1,如果成功,则说明上锁成功,直接返回
MutexGuard,如果失败,则说明锁已经被抢占了,这个时候我们使用
atomic-wait 的 wait() 陷入休眠,等待
wake_one 信号唤醒,再尝试抢锁。
1 | impl<T> Mutex<T> { |
至此,我们第一个版本的 Mutex
就完工了!是不是很简单!我们来写 2
个单元测试验证一下基本逻辑是否正确:
1 |
|
运行成功:
1 | running 2 tests |
v2:减少系统调用
当 MutexGuard 的时候,我们将 state 置为
0,并调用 wake_one
唤醒一个潜在的线程,这个时候如果没有阻塞中的线程的话,那这个系统调用就比较浪费了。
所以在 v2 版本我们尝试来优化这一点。为此,我们需要扩展我们的
state
字段,新增表示是否有阻塞线程的能力。
1 | pub struct Mutex<T> { |
修改了 state
的定义后我们需要修改上锁和解锁的逻辑,在上锁的时候,我们先尝试将
state 从 0 置为
1,如果成功了,说明抢到了锁,否则,我们将
state 置为 2,表示有线程被阻塞了。
这里书中的实现是这样的:
1 | impl<T> Mutex<T> { |
lock():
- 如果成功将
state从0变换为1,则说明当前线程抢锁成功,直接返回MutexGuard。 - 如果失败了,就将
state置为2,然后调用wait进入休眠。
unlock():
- 将
state置为0,如果之前是2的话,那就说明有线程被阻塞着,这个时候才调用wake_one唤醒一个阻塞的线程。
这个地方,笔者觉得有一些问题, state.swap(2, Acquire)
这一行代码会无条件将 state 置为
2,也就是说,当这个线程抢到锁后,它在 unlock()
的时候,无论有没有在阻塞的线程,这个时候 state 都是
2,所以都会调用 wake_one。
sequenceDiagram
participant A as 线程A (持有锁)
participant B as 线程B (尝试获锁)
participant State as Mutex State
participant System as 系统调用
Note over State: state = 1 (线程A持有锁)
B->>State: compare_exchange(0, 1)
State-->>B: 失败 (返回 1)
Note over B: 进入 while 循环
B->>State: swap(2)
Note over State: state: 1 → 2
State-->>B: 返回 1 (≠ 0)
B->>System: wait(&state, 2)
Note over B: 线程B进入等待状态
Note over A: 线程A释放锁
A->>State: swap(0)
Note over State: state: 2 → 0
State-->>A: 返回 2
A->>System: wake_one()
Note over System: 正确的唤醒!线程B确实在等待
Note over B: 线程B被唤醒,继续循环
B->>State: swap(2)
Note over State: state: 0 → 2
State-->>B: 返回 0,退出循环
Note over B: 获得锁,但state=2 (问题所在)
Note over B: 使用锁...
B->>State: unlock() - swap(0)
Note over State: state: 2 → 0
State-->>B: 返回 2
B->>System: wake_one()
Note over System: 不必要的调用!此时没有等待者
我们可以运行上面的测试用例
cross_thread_should_work,可以看到输出了 2 个
wake_one,但是通过分析,应该只需要调用一次 wake_one
就足够了。
1 | test mutex::tests::cross_thread_should_work ... ok |
笔者的实现如下:
1 | fn lock_contended(state: &AtomicU32) { |
- 我们获取
state.compare_exchange(0,1)的返回值,如果成功,说明抢到锁,直接返回。 - 如果失败了:
- 原始值是
1,那我们就尝试将state从1交换为2,然后调用wait陷入休眠。 - 原始值是
2,说明已经有别的线程也被阻塞了,这个时候直接调用wait陷入休眠。
- 原始值是
- 当被
wake_one唤醒时,重新执行state.compare_exchange(0,1)抢占锁。
这个新的流程中,我们抢到锁的时候,state 会被正确的设置为
1 而不是 2,这个时候,在 drop
的时候就不会有不必要的 wake_one 的调用了。
sequenceDiagram
participant A as 线程A (持有锁)
participant B as 线程B (尝试获锁)
participant State as Mutex State
participant System as 系统调用
Note over State: state = 1 (线程A持有锁)
B->>State: compare_exchange(0, 1)
State-->>B: 失败,返回 s=1
Note over B: s == 1,尝试设置等待者标志
B->>State: compare_exchange(1, 2)
Note over State: state: 1 → 2
State-->>B: 成功
B->>System: wait(&state, 2)
Note over B: 线程B进入等待状态
Note over A: 线程A完成工作,释放锁
A->>State: unlock() - swap(0)
Note over State: state: 2 → 0
State-->>A: 返回 2
A->>System: wake_one()
Note over System: 正确唤醒线程B
Note over B: 线程B被唤醒,重新尝试获取锁
B->>State: compare_exchange(0, 1)
Note over State: state: 0 → 1 (关键!)
State-->>B: 成功!退出循环
Note over B: 🎯 获得锁,state=1 (正确状态)
Note over B: 使用锁进行工作...
B->>State: unlock() - swap(0)
Note over State: state: 1 → 0
State-->>B: 返回 1 (不是2!)
Note over B: ✅ 返回值是1,不调用wake_one
Note over System: 🎯 避免了不必要的系统调用
我们重新运行上面的测试用例
cross_thread_should_work,可以看到只输出了 1 个
wake_one:
1 | test mutex::tests::cross_thread_should_work ... ok |
不过笔者在做 benchmark 后发现书中的实现性能其实更高,在 macbook m2max 机器上,书中的版本要比我的版本快 5~10% 左右,猜测大概率是
swap的性能要比compare_exchange高。
v3:短暂自旋进一步避免系统调用
还有一种潜在的优化是,我们可以在抢锁失败且返回 state 为
1
的时候,进行短暂的自旋,如果实际场景中占用锁的时间非常短,那我们就可以再省略一次
wake 的系统调用了。
不过值得注意的是,这种优化未必是正向的,一方面,如果锁占用时间比较长,那前面的自旋就白白浪费了,另一方面,自旋的次数带来的性能消耗,未必就比系统调用要小(不同的平台表现可能很不一样)。
书中给出的经验值是自选 100 次。
优化后的 lock_contended 如下:
1 | fn lock_contended(state: &AtomicU32) { |
感兴趣的读者可以使用 criterion 做一个 benchmark 看看自旋与之前的版本的性能差异有多少。
完整代码可参考:conutils/mutex。
总结
在本篇中,我们从零开始,结合 Rust 原子操作和内存顺序的核心知识,实现一个 Rust 中的 Mutex 锁,逐步揭示 Mutex 背后的等待与唤醒机制,为更好理解标准库中的 Mutex 奠定了良好的基础。下篇,我们将尝试手写一个条件变量 Condition Variable!
Happy Coding! Peace~