系列文章:


本篇我们继续参考 Rust Atomics and Locks 一书来手写一个读写锁:RwLock。这也是这本书中的最后一个实战案例了,很幸运我们能坚持到现在,为自己鼓掌 👏🏻 !

在本章开始之前,我们假设你已经:

  1. 熟悉并理解 Rust 的各种原子操作。
  2. 阅读过 Rust 原理丨聊一聊 Rust 的 Atomic 和内存顺序Rust 原理丨从汇编角度看原子操作,并理解内存顺序和内存屏障的原理和使用方法。
  3. 理解 Rust UnsafeCell<T> 提供的内部可变性允许我们在持有共享引用 & 的时候可以对数据进行修改。
  4. 阅读过 Rust 原理丨操作系统并发原语 并了解 atomic-wait crate 中 wait/wake_one/wake_all 的适用场景和使用方法。

读完本篇你能学到什么

  1. 掌握读写锁的核心语义和基本原理

    • 理解读锁可共享、写锁需独占的设计哲学

    • 学会使用原子操作和 Guard 模式实现线程安全

    • 掌握 UnsafeCell 实现内部可变性的技巧

  2. 解决写线程无用循环问题

    • 识别并发程序中的性能瓶颈

    • 学会通过独立原子变量避免竞争

    • 理解 atomic-wait 的高效使用方式

  3. 巧妙解决写饥饿难题

    • 掌握奇偶数编码表达复杂状态的艺术

    • 理解并发系统中性能与公平性的权衡

    • 学会设计状态机解决复杂同步问题

v1:基础实现

按照惯例,我们先来思考数据结构该如何定义。所谓读写锁,即读锁与读锁之间是可以共存,而读锁与写锁、写锁与写锁之间是互斥的。

  1. 我们需要一个变量 state 来表示当前是否有读线程、有多少读线程、是否有写线程,这里可以用 state 的值来表示有多少读线程,当其为最大值时,表示有写线程。因为 atomic-wait 仅支持 AtomicU32,所以这里我们的数据类型也定义为 AtomicU32.
  2. 抢到写锁的时候,是可以对数据 value 进行修改的,但因我们只持有 &RwLock 共享引用,为了修改数据,我们依旧需要依赖 UnsafeCell 提供内部可变性。
  3. 我们需要 2 个守卫类型,分别作为读守卫和写守卫,它们都持有 RwLock 的引用。
    • 读守卫只能获取共享引用,所以我们为其实现 Deref trait。
    • 写守卫可以获取独占引用,所以我们为其实现 DerefDerefMut trait。
  4. 另外,因为读锁是共享,写锁是独占,所以我们在为 RwLock<T> 实现 Sync trait 的时候,要求 T 必须满足 Sync+Send

综上,我们可以定出以下的数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
pub struct RwLock<T> {
/// The number of read locks, u32::MAX if write locked.
state: AtomicU32,
value: UnsafeCell<T>,
}

pub struct ReadGuard<'a, T> {
rwlock: &'a RwLock<T>,
}

pub struct WriteGuard<'a, T> {
rwlock: &'a RwLock<T>,
}

unsafe impl<T> Sync for RwLock<T> where T: Sync + Send {}

impl<T> Deref for WriteGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.rwlock.value.get() }
}
}

impl<T> DerefMut for WriteGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.rwlock.value.get() }
}
}

impl<T> Deref for ReadGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.rwlock.value.get() }
}
}

impl<T> RwLock<T> {
pub fn new(value: T) -> Self {
Self {
state: AtomicU32::new(0),
value: UnsafeCell::new(value),
}
}
}

OK,现在我们需要来实现最重要的功能:上锁和解锁。

我们先来看上读锁和解锁:当 state != u32::MAX 的时候,我们就可以尝试对 state 进行加 1 抢占读锁,在销毁 ReadGuard 的时候,我们只需要对 state 进行减 1,当最后一个读锁释放的时候,还需要唤醒一个潜在的阻塞中的写线程。且这里,加 1 和减 1 需要用一对 Acquire 和 Release 建立 happens-before 关系。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
impl<T> RwLock<T> {
// ...

pub fn read(&self) -> ReadGuard<'_, T> {
let mut s = self.state.load(Relaxed);
loop {
if s < u32::MAX {
assert!(s != u32::MAX - 1, "too many readers");
// 尝试上读锁
match self.state.compare_exchange_weak(s, s + 1, Acquire, Relaxed) {
Ok(_) => return ReadGuard { rwlock: self },
Err(e) => s = e,
}
}
if s == u32::MAX { // 如果已经上了写锁,就陷入等待
wait(&self.state, u32::MAX);
s = self.state.load(Relaxed)
}
}
}
}

impl<T> Drop for ReadGuard<'_, T> {
fn drop(&mut self) {
if self.rwlock.state.fetch_sub(1, Release) == 1 { // 最后一个读锁释放
wake_one(&self.rwlock.state); // 唤醒一个潜在的写线程
}
}
}

现在来看上写锁和解锁:我们只需要尝试将 state 置为 u32::MAX,如果成功了,则说明上写锁成功,否则需要陷入等待。在解锁的时候,只需要将 state 置为 0,并唤醒所有潜在的阻塞的读线程和写线程。同样,这里也需要一对 Acquire 和 Release 建立 happens-before 关系。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
impl<T> RwLock<T> {
// ...

pub fn write(&self) -> WriteGuard<'_, T> {
// 尝试置为 u32::MAX
while let Err(s) = self.state.compare_exchange(0, u32::MAX, Acquire, Relaxed) {
wait(&self.state, s); // 陷入阻塞
}
WriteGuard { rwlock: self }
}
}

impl<T> Drop for WriteGuard<'_, T> {
fn drop(&mut self) {
self.rwlock.state.store(0, Release);
wake_all(&self.rwlock.state); // 唤醒所有阻塞的写线程和读线程
}
}

到这里,我们一个基本可用的 RwLock 就实现完毕了,你可以参考下图进行辅助理解:

sequenceDiagram
    participant A as 线程A (读者)
    participant B as 线程B (读者)
    participant C as 线程C (写者)
    participant State as RwLock State
    participant Wait as Wait/Wake 机制

    Note over State: state = 0 (未锁定)

    A->>State: read() - compare_exchange_weak(0, 1)
    Note over State: state: 0 → 1
    State-->>A: 成功,返回 ReadGuard

    B->>State: read() - compare_exchange_weak(1, 2)
    Note over State: state: 1 → 2 (两个读锁)
    State-->>B: 成功,返回 ReadGuard

    Note over A, B: 💡 读锁可以并发持有

    C->>State: write() - compare_exchange(0, u32::MAX)
    State-->>C: 失败 (state=2, 不等于0)
    C->>Wait: wait(&state, 2)
    Note over C: 🔒 写线程进入等待状态

    Note over A: 线程A完成读操作
    A->>State: drop(ReadGuard) - fetch_sub(1)
    Note over State: state: 2 → 1
    Note over A: 不是最后一个读锁,不唤醒

    Note over B: 线程B完成读操作
    B->>State: drop(ReadGuard) - fetch_sub(1)
    Note over State: state: 1 → 0
    Note over B: ✨ 最后一个读锁释放!
    B->>Wait: wake_one(&state)

    Note over C: 🎯 写线程被唤醒
    C->>State: write() - compare_exchange(0, u32::MAX)
    Note over State: state: 0 → u32::MAX
    State-->>C: 成功,返回 WriteGuard

    Note over C: 线程C执行写操作...

    C->>State: drop(WriteGuard) - store(0)
    Note over State: state: u32::MAX → 0
    C->>Wait: wake_all(&state)
    Note over Wait: 📢 唤醒所有等待的线程

我们来写下单元测试验证功能是否正确:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#[cfg(test)]
mod tests {
use std::{
thread::{self, sleep},
time::Duration,
};

use super::*;

#[test]
fn one_thread_should_work() {
let rwl = RwLock::new(vec![1, 2, 3]);

let r1 = rwl.read();
assert_eq!(r1.len(), 3);

let r2 = rwl.read();
assert_eq!(r2.len(), 3);

drop(r1);
drop(r2);

let mut w = rwl.write();
w.push(4);
drop(w);

let r3 = rwl.read();
assert_eq!(r3.len(), 4);
}

#[test]
fn cross_thread_should_work() {
let rwl = RwLock::new(vec![]);

thread::scope(|s| {
s.spawn(|| {
let mut w = rwl.write();
w.push(1);
w.push(2);
});

s.spawn(|| {
sleep(Duration::from_millis(100));
let r1 = rwl.read();
println!("{:?}", *r1);
let r2 = rwl.read();
println!("{:?}", *r2);
});
})
}
}

执行结果是通过的:

1
2
3
running 2 tests
test rwlock::tests::one_thread_should_work ... ok
test rwlock::tests::cross_thread_should_work ... ok

v2:避免写线程无用循环

1
2
3
4
5
6
7
8
9
10
impl<T> RwLock<T> {
// ...

pub fn write(&self) -> WriteGuard<'_, T> {
while let Err(s) = self.state.compare_exchange(0, u32::MAX, Acquire, Relaxed) {
wait(&self.state, s);
}
WriteGuard { rwlock: self }
}
}

观察一下我们上个版本的 write() 实现,这里可能会出现一种情况:当上读锁非常频繁的时候,state 的值是一直在变化的,这个时候,wait(&self.state, s) 就有可能因为 state 的值发生了变化,不等于 s 了,就不会陷入等待,而是继续循环尝试将 state0 置为 u32::MAX。这时如果上读锁非常频繁的话,那这里会一直尝试获取,且是无意义的尝试。

sequenceDiagram
    participant W as 写线程
    participant R1 as 读线程1
    participant R2 as 读线程2
    participant State as RwLock State
    participant Wait as Wait 机制

    Note over State: state = 1 (有一个读锁)

    W->>State: write() - compare_exchange(0, u32::MAX)
    State-->>W: 失败,返回 s=1

    Note over W: 准备调用 wait(&state, 1)

    R1->>State: drop(ReadGuard) - fetch_sub(1)
    Note over State: state: 1 → 0 ⚡️ (在 wait 调用前状态就变了!)

    W->>Wait: wait(&state, 1)
    Note over Wait: ❌ 当前 state=0,不等于期望值1,立即返回!
    Wait-->>W: 立即返回,没有真正等待

    Note over R2: 💨 新读线程抢在 compare_exchange 前获取锁!
    R2->>State: read() - compare_exchange_weak(0, 1)
    Note over State: state: 0 → 1

    Note over W: 🔄 进入下一轮循环
    W->>State: compare_exchange(0, u32::MAX)

    State-->>W: 失败,返回 s=1 (锁又被抢了!)

    Note over W: 又准备调用 wait(&state, 1)

    R2->>State: drop(ReadGuard) - fetch_sub(1)
    Note over State: state: 1 → 0 ⚡️ (又在 wait 前变化了!)

    W->>Wait: wait(&state, 1)
    Wait-->>W: 又是立即返回!

    Note over W: 🔄 无用循环开始...
    Note over W: ⚠️ 写线程永远无法真正进入等待状态!
    Note over W: 💀 CPU 被白白消耗在无效的重试上

    Note over State: 🎯 问题核心:wait 调用前 state 总是被读锁改变
    Note over State: 💡 解决:独立的 writer_wake_counter 避免这种竞争

这里问题的根源在于 read()write() 监听的都是同一个原子变量 state

解决这个问题的方案之一,就是可以另外加一个原子变量,专门给 write(),从而与 read() 区分开来,避免互相影响,从而造成 write() 时产生无用循环。

我们在 RwLock 中新增一个属性:writer_wake_counter,用于唤醒抢占写锁的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
pub struct RwLock<T> {
/// The number of read locks, u32::MAX if write locked.
state: AtomicU32,
/// Incremented to wake up waiters.
writer_wake_counter: AtomicU32,
value: UnsafeCell<T>,
}

impl<T> RwLock<T> {
pub fn new(value: T) -> Self {
Self {
state: AtomicU32::new(0),
writer_wake_counter: AtomicU32::new(0),
value: UnsafeCell::new(value),
}
}

// ...
}

这个时候:

  1. 我们在 write() 时没抢到锁,就不去 wait(&state) 了,而是 wait(&writer_wake_counter)
  2. 最后一个 ReadGuard 在 drop 的时候,我们修改 writer_wake_counter 的值,并唤醒一个潜在的阻塞的写线程。
  3. WriteGuard 的时候,不仅要唤醒所有阻塞的读线程,还要唤醒一个潜在的阻塞的写线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
impl<T> RwLock<T> {
// ...

pub fn write(&self) -> WriteGuard<'_, T> {
while self
.state
.compare_exchange(0, u32::MAX, Acquire, Relaxed) // 尝试抢锁
.is_err()
{
// 失败的话取出 writer_wake_counter,供待会 `wait` 使用。
let w = self.writer_wake_counter.load(Acquire);
if self.state.load(Relaxed) != 0 { // 再次检查 state,如果为 0,就不阻塞了,再次尝试抢锁
wait(&self.writer_wake_counter, w); // 否则陷入等待
}
}
WriteGuard { rwlock: self } // 抢锁成功,返回守卫对象
}
}

impl<T> Drop for ReadGuard<'_, T> {
fn drop(&mut self) {
if self.rwlock.state.fetch_sub(1, Release) == 1 {
self.rwlock.writer_wake_counter.fetch_add(1, Release);
wake_one(&self.rwlock.writer_wake_counter);
}
}
}

impl<T> Drop for WriteGuard<'_, T> {
fn drop(&mut self) {
self.rwlock.state.store(0, Release);
self.rwlock.writer_wake_counter.fetch_sub(1, Release);
wake_one(&self.rwlock.writer_wake_counter); // 唤醒一个写线程
wake_all(&self.rwlock.state); // 唤醒所有的读线程
}
}

通过这个简单的优化,我们就可以避免写线程因为 state 的频繁变化而产生无用循环了。再次运行之前的测试用例,顺利通过的话就没有问题啦!

v3:避免写饥饿

这个版本我们来做最后一个可尝试优化:避免写饥饿

在之前的实现中,只要没上锁,或者上的是读锁,那么就可以一直不断地上读锁。而只有当没有任何读线程和写线程存在的时候,才可以上写锁。这就非常容易造成写饥饿了,因为那些后来的读线程依旧可以成功上读锁,而写线程抢到锁的条件太苛刻了。

为了解决这个问题,我们可以修改一下 state 的定义:

  1. state0 的时候,说明没上锁。
  2. stateu32::MAX 的时候,说明上了写锁。
  3. state 为非 0 的偶数的时候,说明上了读锁,且没有阻塞中的写锁,这个时候可以继续上读锁
  4. state 为其他奇数的时候,说明这个时候有阻塞中的写锁,后面来的读锁也需要阻塞
1
2
3
4
5
6
7
8
9
10
11
pub struct RwLock<T> {
/// 读线程的数量 *2,当有阻塞中的写线程的时候 +1
/// 如果 state=u32::MAX,说上了写锁。
///
/// 这意味着,当 state 是偶数的时候,可以继续上读锁,
/// 但是,如果 state 是奇数的话,上读锁会被阻塞。
state: AtomicU32,
/// Incremented to wake up waiters.
writer_wake_counter: AtomicU32,
value: UnsafeCell<T>,
}

通过这样的调整,我们就可以相对公平地去避免写饥饿的问题了。为此,我们需要调整读写锁的上锁和解锁的逻辑。

read():

  1. state 为偶数的时候,我们可以尝试对其进行 +2 继续抢占读锁;
  2. state 为奇数的时候,我们需要调用 wait 陷入等待。

ReadGuard.drop():

  1. 解锁的时候对 state 进行 -2
  2. 如果之前的值是 3 的话,说明这是最后一个释放的读锁,且存在被阻塞的写线程,这个时候需要调用 wake_one 进行唤醒。

write():

  1. 如果 state0,则说明此时没有上锁:可以尝试将其置为 u32::MAX 抢占锁:
    1. 如果成功,直接返回。
    2. 如果抢锁失败,则重新读取 state 值进行重新判断。
  2. 如果 state 为非 0 偶数,则说明此时已经上了读锁了,我们需要将 state +1 设置为奇数,表示有写线程被阻塞着,阻止后面新来的读线程抢占读锁。
  3. 如果 state 为非 0 奇数:
    1. 如果 state 等于 1,那说明既没有上写锁,也没有上读锁,但是有一个阻塞中的写线程,那有可能就是当前线程自己了!这个时候,我们依旧是可以尝试将其置为 u32::MAX 抢占锁,所以 state1 的情况,可以跟 state0 的情况进行合并处理。
    2. 如果 state 大于 1,则说明已经上了读锁,这个时候,需要陷入等待,等前面的读锁都释放了,才能进入下一轮循环,重新尝试抢占写锁。

WriteGuard.drop():

  1. state 置为 0
  2. 唤醒潜在的所有阻塞的读线程和一个写线程。

我画了个流程图,供你辅助理解:

sequenceDiagram
    participant R1 as 读线程1 (已持有)
    participant R2 as 读线程2 (已持有)
    participant W as 写线程 (等待)
    participant R3 as 读线程3 (新来)
    participant State as RwLock State
    participant Wait as Wait/Wake 机制

    Note over State: state = 4 (两个读锁,4 = 2×2)
    Note over R1, R2: 💡 两个读线程正在工作...

    W->>State: write() - 尝试获取写锁
    Note over W: 发现有读锁,无法获取

    W->>State: compare_exchange(4, 5) - 设置写等待标志
    Note over State: state: 4 → 5 (奇数!有等待的写线程)

    W->>Wait: wait(&writer_wake_counter, w)
    Note over W: 🔒 写线程进入等待,但已设置奇数标志

    Note over R3: 💨 新的读线程想要获取锁
    R3->>State: read() - 检查 state % 2
    Note over R3: state=5 是奇数,发现有等待的写线程!

    R3->>Wait: wait(&state, 5)
    Note over R3: 🚫 读线程被阻塞,无法抢占!

    Note over R1: 读线程1完成工作
    R1->>State: drop(ReadGuard) - fetch_sub(2)
    Note over State: state: 5 → 3 (仍然是奇数)
    Note over R1: 不是最后一个读锁,不唤醒

    Note over R2: 读线程2完成工作
    R2->>State: drop(ReadGuard) - fetch_sub(2)
    Note over State: state: 3 → 1
    Note over R2: ✨ 检测到 state=3,是最后一个读锁!

    R2->>Wait: wake_one(&writer_wake_counter)

    Note over W: 🎯 写线程被唤醒
    W->>State: write() - compare_exchange(1, u32::MAX)
    Note over State: state: 1 → u32::MAX
    State-->>W: ✅ 成功获取写锁!

    Note over W: 写线程执行关键操作...
    Note over R3: 读线程3仍在等待,公平性得到保证!

    W->>State: drop(WriteGuard) - store(0)
    Note over State: state: u32::MAX → 0
    W->>Wait: wake_all(&state) - 唤醒所有等待的读线程

    Note over R3: 🎯 读线程3被唤醒,现在可以获取锁了
    R3->>State: read() - compare_exchange_weak(0, 2)
    Note over State: state: 0 → 2 (偶数,正常读锁)

综上分析,我们最新的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
impl<T> RwLock<T> {
// ...

pub fn read(&self) -> ReadGuard<'_, T> {
let mut s = self.state.load(Relaxed);
loop {
// 如果是 0,则说明现在没有上锁,
// 如果是偶数,则说明现在上的是读锁,写没有阻塞中的写线程,
// 这两种情况,都可以尝试将 state+2 进行抢占读锁。
if s % 2 == 0 {
assert!(s != u32::MAX - 2, "too many readers");
match self.state.compare_exchange_weak(s, s + 2, Acquire, Relaxed) {
Ok(_) => return ReadGuard { rwlock: self }, // 抢占成功,直接返回。
Err(e) => s = e,
}
}

// 如果 state 为奇数,说明有阻塞中的写线程。
if s % 2 == 1 {
// 这时候,后面来的读线程都需要阻塞,等待写锁的获取和释放。
wait(&self.state, s);
s = self.state.load(Relaxed)
}
}
}

pub fn write(&self) -> WriteGuard<'_, T> {
let mut s = self.state.load(Relaxed);
loop {
// 0: 没有上锁,可以尝试抢锁。
// 1: 没有上锁,但有阻塞的写线程,可能就是自己,依旧可以尝试抢锁。
if s <= 1 {
// 尝试将 state 置为 u32::MAX,成功则直接返回
match self.state.compare_exchange(s, u32::MAX, Acquire, Relaxed) {
Ok(_) => return WriteGuard { rwlock: self },
Err(e) => {
s = e;
continue;
}
}
}

// 非 0 的偶数,说明现在上的是读锁,且之前没有阻塞的写线程。
if s % 2 == 0 {
// 将 state+1 置为奇数,表示已经有阻塞的写线程了,阻止后来的读线程抢占读锁。
match self.state.compare_exchange(s, s + 1, Relaxed, Relaxed) {
Ok(_) => {}
Err(e) => {
s = e;
continue;
}
}
}
let w = self.writer_wake_counter.load(Acquire);
s = self.state.load(Relaxed);
if s >= 2 {
// 如果已经上了读锁,则陷入等待,等待前面所有读锁的释放。
wait(&self.writer_wake_counter, w);
s = self.state.load(Relaxed);
}
}
}
}

// 释放读锁
impl<T> Drop for ReadGuard<'_, T> {
fn drop(&mut self) {
// 对 state -=2,如果之前的值是 3,则说明当前是最后一个释放的读锁,且存在阻塞中的写线程。
if self.rwlock.state.fetch_sub(2, Release) == 3 {
// 唤醒一个阻塞中的写线程。
self.rwlock.writer_wake_counter.fetch_add(1, Release);
wake_one(&self.rwlock.writer_wake_counter);
}
}
}

// 释放写锁
impl<T> Drop for WriteGuard<'_, T> {
fn drop(&mut self) {
// 将 state 重置为 0。
self.rwlock.state.store(0, Release);
// 唤醒一个潜在的阻塞的写线程。
self.rwlock.writer_wake_counter.fetch_sub(1, Release);
wake_one(&self.rwlock.writer_wake_counter);
// 唤醒所有潜在的阻塞的读线程。
wake_all(&self.rwlock.state);
}
}

这个时候,我们重新运行之前的测试用例,可以发现的顺利通过的!不过我们还需要加一个测试用例,来测试写饥饿是否被解决了!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#[test]
fn writer_starvation_should_resolved() {
for _ in 0..100 { // 跑 100 次,避免偶然

let rwl = RwLock::new(vec![]);
thread::scope(|s| {
s.spawn(|| { // 线程 1
let mut w = rwl.write();
w.push(1);
w.push(2);
});

s.spawn(|| { // 线程 2
sleep(Duration::from_millis(10));
let r1 = rwl.read();
println!("{:?}", *r1);
let r2 = rwl.read();
println!("{:?}", *r2);
sleep(Duration::from_millis(50)); // stay locked to block after writers and readers
});

s.spawn(|| { // 线程 3
sleep(Duration::from_millis(20));
let mut w2 = rwl.write();
w2.push(3);
});

s.spawn(|| { // 线程 4
sleep(Duration::from_millis(30));
let r = rwl.read();
assert_eq!(r.len(), 3); // must get lock after w2
});
})
}
}

在上面这个测试用例中,线程 2/3/4 在开始之前分别会睡眠 10/20/30ms,所以抢占锁的时间顺序是 w->r1->r2->w2->r。并且线程 2 在退出之前,还睡眠了 50ms,所以线程 3 和线程 4 在抢占锁的时候,r1/r2 都还没释放。

如果我们成功解决了写饥饿问题的话,那这个时候,线程 3 应该会将 state 置为奇数,防止线程 4 抢占读锁。所以当线程 4 抢到读锁的时候,线程 3 一定已经释放写锁了,即 vec 里面一定有 3 条数据!通过运行 writer_starvation_should_resolved 测试用例,很幸运,我们已经成功解决了写饥饿的问题了!

完整代码可以参考:conutils/rwlock

总结

本篇文章通过三个渐进式版本完整展示了如何从零开始手写一个功能完备的读写锁(RwLock)。

我们的实现历程体现了系统编程中常见的优化思路:

  • v1 基础实现:实现了核心的读写锁语义,确保功能正确性

  • v2 性能优化:通过独立的 writer_wake_counter 避免写线程无用循环,提升了性能

  • v3 公平性保证:巧妙使用奇偶数编码解决写饥饿问题,在性能与公平性间找到平衡

通过这次实战,我们不仅掌握了 RwLock 的实现细节,更重要的是学会了并发编程的系统性思维方式。这些经验将在后续的系统编程实践中发挥重要作用。

这也是我们学习 Rust Atomics and Locks 这本优秀书籍的收官之战,笔者由衷地佩服 Mara Bos 能在短短 200 多页的篇幅将 Rust 的并发编程阐述得如何透彻、清晰且足够深入细节,笔者在整理本系列笔记的过程中,也真的是获益颇丰,希望也能给你带来一些收获,那我们下篇文章见!

Happy Coding! Peace~