系列文章:
- Rust 原理丨聊一聊 Rust 的 Atomic 和内存顺序
- Rust 原理丨从汇编角度看原子操作
- Rust 实战丨手写一个 SpinLock
- Rust 实战丨手写一个 oneshot channel
- Rust 实战丨手写一个 Arc
- Rust 原理丨操作系统并发原语
- Rust 实战丨手写一个 Mutex
- Rust 实战丨手写一个 Condvar
- Rust 实战丨手写一个 RwLock 👈 本篇
本篇我们继续参考 Rust Atomics and Locks 一书来手写一个读写锁:RwLock。这也是这本书中的最后一个实战案例了,很幸运我们能坚持到现在,为自己鼓掌 👏🏻 !
在本章开始之前,我们假设你已经:
- 熟悉并理解 Rust 的各种原子操作。
- 阅读过 Rust 原理丨聊一聊 Rust 的 Atomic 和内存顺序 和 Rust 原理丨从汇编角度看原子操作,并理解内存顺序和内存屏障的原理和使用方法。
- 理解 Rust
UnsafeCell<T>提供的内部可变性允许我们在持有共享引用&的时候可以对数据进行修改。 - 阅读过 Rust
原理丨操作系统并发原语 并了解 atomic-wait crate 中
wait/wake_one/wake_all的适用场景和使用方法。
读完本篇你能学到什么
掌握读写锁的核心语义和基本原理
理解读锁可共享、写锁需独占的设计哲学
学会使用原子操作和 Guard 模式实现线程安全
掌握
UnsafeCell实现内部可变性的技巧
解决写线程无用循环问题
识别并发程序中的性能瓶颈
学会通过独立原子变量避免竞争
理解
atomic-wait的高效使用方式
巧妙解决写饥饿难题
掌握奇偶数编码表达复杂状态的艺术
理解并发系统中性能与公平性的权衡
学会设计状态机解决复杂同步问题
v1:基础实现
按照惯例,我们先来思考数据结构该如何定义。所谓读写锁,即读锁与读锁之间是可以共存,而读锁与写锁、写锁与写锁之间是互斥的。
- 我们需要一个变量
state来表示当前是否有读线程、有多少读线程、是否有写线程,这里可以用state的值来表示有多少读线程,当其为最大值时,表示有写线程。因为 atomic-wait 仅支持 AtomicU32,所以这里我们的数据类型也定义为 AtomicU32. - 抢到写锁的时候,是可以对数据
value进行修改的,但因我们只持有&RwLock共享引用,为了修改数据,我们依旧需要依赖UnsafeCell提供内部可变性。 - 我们需要 2 个守卫类型,分别作为读守卫和写守卫,它们都持有 RwLock
的引用。
- 读守卫只能获取共享引用,所以我们为其实现
Dereftrait。 - 写守卫可以获取独占引用,所以我们为其实现
Deref和DerefMuttrait。
- 读守卫只能获取共享引用,所以我们为其实现
- 另外,因为读锁是共享,写锁是独占,所以我们在为
RwLock<T>实现Synctrait 的时候,要求T必须满足Sync+Send。
综上,我们可以定出以下的数据结构:
1 | pub struct RwLock<T> { |
OK,现在我们需要来实现最重要的功能:上锁和解锁。
我们先来看上读锁和解锁:当 state != u32::MAX
的时候,我们就可以尝试对 state 进行加 1 抢占读锁,在销毁
ReadGuard 的时候,我们只需要对 state 进行减
1,当最后一个读锁释放的时候,还需要唤醒一个潜在的阻塞中的写线程。且这里,加
1 和减 1 需要用一对 Acquire 和 Release 建立 happens-before 关系。
代码如下:
1 | impl<T> RwLock<T> { |
现在来看上写锁和解锁:我们只需要尝试将 state 置为
u32::MAX,如果成功了,则说明上写锁成功,否则需要陷入等待。在解锁的时候,只需要将
state 置为
0,并唤醒所有潜在的阻塞的读线程和写线程。同样,这里也需要一对
Acquire 和 Release 建立 happens-before 关系。
代码如下:
1 | impl<T> RwLock<T> { |
到这里,我们一个基本可用的 RwLock
就实现完毕了,你可以参考下图进行辅助理解:
sequenceDiagram
participant A as 线程A (读者)
participant B as 线程B (读者)
participant C as 线程C (写者)
participant State as RwLock State
participant Wait as Wait/Wake 机制
Note over State: state = 0 (未锁定)
A->>State: read() - compare_exchange_weak(0, 1)
Note over State: state: 0 → 1
State-->>A: 成功,返回 ReadGuard
B->>State: read() - compare_exchange_weak(1, 2)
Note over State: state: 1 → 2 (两个读锁)
State-->>B: 成功,返回 ReadGuard
Note over A, B: 💡 读锁可以并发持有
C->>State: write() - compare_exchange(0, u32::MAX)
State-->>C: 失败 (state=2, 不等于0)
C->>Wait: wait(&state, 2)
Note over C: 🔒 写线程进入等待状态
Note over A: 线程A完成读操作
A->>State: drop(ReadGuard) - fetch_sub(1)
Note over State: state: 2 → 1
Note over A: 不是最后一个读锁,不唤醒
Note over B: 线程B完成读操作
B->>State: drop(ReadGuard) - fetch_sub(1)
Note over State: state: 1 → 0
Note over B: ✨ 最后一个读锁释放!
B->>Wait: wake_one(&state)
Note over C: 🎯 写线程被唤醒
C->>State: write() - compare_exchange(0, u32::MAX)
Note over State: state: 0 → u32::MAX
State-->>C: 成功,返回 WriteGuard
Note over C: 线程C执行写操作...
C->>State: drop(WriteGuard) - store(0)
Note over State: state: u32::MAX → 0
C->>Wait: wake_all(&state)
Note over Wait: 📢 唤醒所有等待的线程
我们来写下单元测试验证功能是否正确:
1 |
|
执行结果是通过的:
1 | running 2 tests |
v2:避免写线程无用循环
1 | impl<T> RwLock<T> { |
观察一下我们上个版本的 write()
实现,这里可能会出现一种情况:当上读锁非常频繁的时候,state
的值是一直在变化的,这个时候,wait(&self.state, s)
就有可能因为 state 的值发生了变化,不等于 s
了,就不会陷入等待,而是继续循环尝试将 state 从
0 置为
u32::MAX。这时如果上读锁非常频繁的话,那这里会一直尝试获取,且是无意义的尝试。
sequenceDiagram
participant W as 写线程
participant R1 as 读线程1
participant R2 as 读线程2
participant State as RwLock State
participant Wait as Wait 机制
Note over State: state = 1 (有一个读锁)
W->>State: write() - compare_exchange(0, u32::MAX)
State-->>W: 失败,返回 s=1
Note over W: 准备调用 wait(&state, 1)
R1->>State: drop(ReadGuard) - fetch_sub(1)
Note over State: state: 1 → 0 ⚡️ (在 wait 调用前状态就变了!)
W->>Wait: wait(&state, 1)
Note over Wait: ❌ 当前 state=0,不等于期望值1,立即返回!
Wait-->>W: 立即返回,没有真正等待
Note over R2: 💨 新读线程抢在 compare_exchange 前获取锁!
R2->>State: read() - compare_exchange_weak(0, 1)
Note over State: state: 0 → 1
Note over W: 🔄 进入下一轮循环
W->>State: compare_exchange(0, u32::MAX)
State-->>W: 失败,返回 s=1 (锁又被抢了!)
Note over W: 又准备调用 wait(&state, 1)
R2->>State: drop(ReadGuard) - fetch_sub(1)
Note over State: state: 1 → 0 ⚡️ (又在 wait 前变化了!)
W->>Wait: wait(&state, 1)
Wait-->>W: 又是立即返回!
Note over W: 🔄 无用循环开始...
Note over W: ⚠️ 写线程永远无法真正进入等待状态!
Note over W: 💀 CPU 被白白消耗在无效的重试上
Note over State: 🎯 问题核心:wait 调用前 state 总是被读锁改变
Note over State: 💡 解决:独立的 writer_wake_counter 避免这种竞争
这里问题的根源在于 read() 和
write() 监听的都是同一个原子变量
state。
解决这个问题的方案之一,就是可以另外加一个原子变量,专门给
write(),从而与 read()
区分开来,避免互相影响,从而造成 write()
时产生无用循环。
我们在 RwLock
中新增一个属性:writer_wake_counter,用于唤醒抢占写锁的线程。
1 | pub struct RwLock<T> { |
这个时候:
- 我们在
write()时没抢到锁,就不去wait(&state)了,而是wait(&writer_wake_counter)。 - 最后一个
ReadGuard在 drop 的时候,我们修改writer_wake_counter的值,并唤醒一个潜在的阻塞的写线程。 WriteGuard的时候,不仅要唤醒所有阻塞的读线程,还要唤醒一个潜在的阻塞的写线程。
1 | impl<T> RwLock<T> { |
通过这个简单的优化,我们就可以避免写线程因为 state
的频繁变化而产生无用循环了。再次运行之前的测试用例,顺利通过的话就没有问题啦!
v3:避免写饥饿
这个版本我们来做最后一个可尝试优化:避免写饥饿。
在之前的实现中,只要没上锁,或者上的是读锁,那么就可以一直不断地上读锁。而只有当没有任何读线程和写线程存在的时候,才可以上写锁。这就非常容易造成写饥饿了,因为那些后来的读线程依旧可以成功上读锁,而写线程抢到锁的条件太苛刻了。
为了解决这个问题,我们可以修改一下 state 的定义:
- 当
state为0的时候,说明没上锁。 - 当
state为u32::MAX的时候,说明上了写锁。 - 当
state为非 0 的偶数的时候,说明上了读锁,且没有阻塞中的写锁,这个时候可以继续上读锁。 - 当
state为其他奇数的时候,说明这个时候有阻塞中的写锁,后面来的读锁也需要阻塞。
1 | pub struct RwLock<T> { |
通过这样的调整,我们就可以相对公平地去避免写饥饿的问题了。为此,我们需要调整读写锁的上锁和解锁的逻辑。
read():
- 当
state为偶数的时候,我们可以尝试对其进行 +2 继续抢占读锁; - 当
state为奇数的时候,我们需要调用wait陷入等待。
ReadGuard.drop():
- 解锁的时候对
state进行 -2; - 如果之前的值是
3的话,说明这是最后一个释放的读锁,且存在被阻塞的写线程,这个时候需要调用wake_one进行唤醒。
write():
- 如果
state为0,则说明此时没有上锁:可以尝试将其置为u32::MAX抢占锁:- 如果成功,直接返回。
- 如果抢锁失败,则重新读取
state值进行重新判断。
- 如果
state为非 0 偶数,则说明此时已经上了读锁了,我们需要将state+1 设置为奇数,表示有写线程被阻塞着,阻止后面新来的读线程抢占读锁。 - 如果
state为非 0 奇数:- 如果
state等于 1,那说明既没有上写锁,也没有上读锁,但是有一个阻塞中的写线程,那有可能就是当前线程自己了!这个时候,我们依旧是可以尝试将其置为u32::MAX抢占锁,所以state为1的情况,可以跟state为0的情况进行合并处理。 - 如果
state大于 1,则说明已经上了读锁,这个时候,需要陷入等待,等前面的读锁都释放了,才能进入下一轮循环,重新尝试抢占写锁。
- 如果
WriteGuard.drop():
- 将
state置为0; - 唤醒潜在的所有阻塞的读线程和一个写线程。
我画了个流程图,供你辅助理解:
sequenceDiagram
participant R1 as 读线程1 (已持有)
participant R2 as 读线程2 (已持有)
participant W as 写线程 (等待)
participant R3 as 读线程3 (新来)
participant State as RwLock State
participant Wait as Wait/Wake 机制
Note over State: state = 4 (两个读锁,4 = 2×2)
Note over R1, R2: 💡 两个读线程正在工作...
W->>State: write() - 尝试获取写锁
Note over W: 发现有读锁,无法获取
W->>State: compare_exchange(4, 5) - 设置写等待标志
Note over State: state: 4 → 5 (奇数!有等待的写线程)
W->>Wait: wait(&writer_wake_counter, w)
Note over W: 🔒 写线程进入等待,但已设置奇数标志
Note over R3: 💨 新的读线程想要获取锁
R3->>State: read() - 检查 state % 2
Note over R3: state=5 是奇数,发现有等待的写线程!
R3->>Wait: wait(&state, 5)
Note over R3: 🚫 读线程被阻塞,无法抢占!
Note over R1: 读线程1完成工作
R1->>State: drop(ReadGuard) - fetch_sub(2)
Note over State: state: 5 → 3 (仍然是奇数)
Note over R1: 不是最后一个读锁,不唤醒
Note over R2: 读线程2完成工作
R2->>State: drop(ReadGuard) - fetch_sub(2)
Note over State: state: 3 → 1
Note over R2: ✨ 检测到 state=3,是最后一个读锁!
R2->>Wait: wake_one(&writer_wake_counter)
Note over W: 🎯 写线程被唤醒
W->>State: write() - compare_exchange(1, u32::MAX)
Note over State: state: 1 → u32::MAX
State-->>W: ✅ 成功获取写锁!
Note over W: 写线程执行关键操作...
Note over R3: 读线程3仍在等待,公平性得到保证!
W->>State: drop(WriteGuard) - store(0)
Note over State: state: u32::MAX → 0
W->>Wait: wake_all(&state) - 唤醒所有等待的读线程
Note over R3: 🎯 读线程3被唤醒,现在可以获取锁了
R3->>State: read() - compare_exchange_weak(0, 2)
Note over State: state: 0 → 2 (偶数,正常读锁)
综上分析,我们最新的代码如下:
1 | impl<T> RwLock<T> { |
这个时候,我们重新运行之前的测试用例,可以发现的顺利通过的!不过我们还需要加一个测试用例,来测试写饥饿是否被解决了!
1 |
|
在上面这个测试用例中,线程 2/3/4 在开始之前分别会睡眠
10/20/30ms,所以抢占锁的时间顺序是
w->r1->r2->w2->r。并且线程 2
在退出之前,还睡眠了 50ms,所以线程 3 和线程 4
在抢占锁的时候,r1/r2 都还没释放。
如果我们成功解决了写饥饿问题的话,那这个时候,线程 3 应该会将
state 置为奇数,防止线程 4 抢占读锁。所以当线程 4
抢到读锁的时候,线程 3 一定已经释放写锁了,即 vec
里面一定有 3 条数据!通过运行
writer_starvation_should_resolved
测试用例,很幸运,我们已经成功解决了写饥饿的问题了!
完整代码可以参考:conutils/rwlock。
总结
本篇文章通过三个渐进式版本完整展示了如何从零开始手写一个功能完备的读写锁(RwLock)。
我们的实现历程体现了系统编程中常见的优化思路:
v1 基础实现:实现了核心的读写锁语义,确保功能正确性
v2 性能优化:通过独立的
writer_wake_counter避免写线程无用循环,提升了性能v3 公平性保证:巧妙使用奇偶数编码解决写饥饿问题,在性能与公平性间找到平衡
通过这次实战,我们不仅掌握了 RwLock 的实现细节,更重要的是学会了并发编程的系统性思维方式。这些经验将在后续的系统编程实践中发挥重要作用。
这也是我们学习 Rust Atomics and Locks 这本优秀书籍的收官之战,笔者由衷地佩服 Mara Bos 能在短短 200 多页的篇幅将 Rust 的并发编程阐述得如何透彻、清晰且足够深入细节,笔者在整理本系列笔记的过程中,也真的是获益颇丰,希望也能给你带来一些收获,那我们下篇文章见!
Happy Coding! Peace~