系列文章:


继上篇 Rust 原理丨操作系统并发原语,我们学习了不同操作系统下的并发原语实现,理解了它们最重要的贡献就是提供了一套 wait/wake_one/wake_all 的机制。本篇,我们将借助 Rust Atomics and Locks 的作者 Mara Bos 封装的 atomic-wait crate,来手写一个自己的 Mutex

v1:基本实现

首先我们来思考一下如何定义数据结构:

  1. 我们需要 1 个原子变量 state 来记录锁的状态(0: unlocked, 1: locked),因为 atomic-wait 只支持 AtomicU32,所以这里我们的类型也定义为 AtomicU32。
  2. 另外我们需要一个 value 字段来保存数据,当抢到锁的时候,是可以对 value 进行修改的,但是这个时候只有共享引用,所以我们需要 UnsafeCell 来提供内部可变性。

同时,贯彻 RAII(Resource Acquisition Is Initialization,资源获取即初始化)原则:

  1. 我们在 lock(&self) 成功时返回一个 MutexGuard,它包含 &Mutex
  2. MutexGuard drop 的时候,我们将 state 重置为 0,表示释放锁,并唤醒一个潜在的阻塞线程。

为了让 Mutex 可以在线程之间共享,我们需要为其实现 Sync trait,而又因为 Mutex 实现的是独占访问,上锁成功的线程是拥有 T 的所有权的,即要求 T 可以在线程中转移,即要求 T 需要实现 Send trait。

综上,我们定义的 MutexMutexGuard 结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
pub struct Mutex<T> {
/// 0: unlocked
/// 1: locked
state: AtomicU32,
value: UnsafeCell<T>,
}

pub struct MutexGuard<'a, T> {
mutex: &'a Mutex<T>,
}

unsafe impl<T> Sync for Mutex<T> where T: Send {}

impl<T> Mutex<T> {
pub fn new(value: T) -> Self {
Self {
state: AtomicU32::new(0),
value: UnsafeCell::new(value),
}
}
}

为了方面访问内部数据,我们为 MutexGuard 实现 DerefDerefMut 这 2 个 trait:

1
2
3
4
5
6
7
8
9
10
11
12
impl<T> Deref for MutexGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.mutex.value.get() }
}
}

impl<T> DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { &mut *self.mutex.value.get() }
}
}

MutexGuard 离开作用域的时候,即被 drop 的时候,我们需要释放锁,并调用 wake_one 去唤醒一个潜在的阻塞线程:

1
2
3
4
5
6
impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
self.mutex.state.store(0, Release);
wake_one(&self.mutex.state);
}
}

这里将 state 设置为 0,使用的内存顺序是 Release,是为了跟 lock 的时候使用 Acquire 建立 happens-before 原则,确保 state 的真实值在各个线程中都是可见的。

lock 的时候,我们需要将 state 从 0 替换为 1,如果成功,则说明上锁成功,直接返回 MutexGuard,如果失败,则说明锁已经被抢占了,这个时候我们使用 atomic-waitwait() 陷入休眠,等待 wake_one 信号唤醒,再尝试抢锁。

1
2
3
4
5
6
7
8
9
10
impl<T> Mutex<T> {
// ...

pub fn lock(&self) -> MutexGuard<T> {
while self.state.swap(1, Acquire) == 1 {
wait(&self.state, 1);
}
MutexGuard { mutex: self }
}
}

至此,我们第一个版本的 Mutex 就完工了!是不是很简单!我们来写 2 个单元测试验证一下基本逻辑是否正确:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#[test]
fn one_thread_should_work() {
let l = Mutex::new(vec![]);
let mut guard = l.lock();
guard.push(1);
drop(guard);

let guard = l.lock();
assert_eq!(guard[0], 1);
}

#[test]
fn cross_thread_should_work() {
let l = Mutex::new(vec![]);

thread::scope(|s| {
s.spawn(|| {
let mut guard = l.lock();
guard.push(1);
sleep(Duration::from_millis(100)); // sleep for makeing the second thread to be blcoked.
});

sleep(Duration::from_millis(10)); // make sure the first thread get the lock
s.spawn(|| {
let mut guard = l.lock();
guard.push(2);
});
});

let guard = l.lock();
assert_eq!(guard.len(), 2);
}

运行成功:

1
2
3
running 2 tests
test mutex::tests::one_thread_should_work ... ok
test mutex::tests::cross_thread_should_work ... ok

v2:减少系统调用

MutexGuard 的时候,我们将 state 置为 0,并调用 wake_one 唤醒一个潜在的线程,这个时候如果没有阻塞中的线程的话,那这个系统调用就比较浪费了。

所以在 v2 版本我们尝试来优化这一点。为此,我们需要扩展我们的 state 字段,新增表示是否有阻塞线程的能力。

1
2
3
4
5
6
7
pub struct Mutex<T> {
/// 0: unlocked
/// 1: locked, but no blocked thread
/// 2: locked, but has blocked threads
state: AtomicU32,
value: UnsafeCell<T>,
}

修改了 state 的定义后我们需要修改上锁和解锁的逻辑,在上锁的时候,我们先尝试将 state0 置为 1,如果成功了,说明抢到了锁,否则,我们将 state 置为 2,表示有线程被阻塞了。

这里书中的实现是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
impl<T> Mutex<T> {
// ...

pub fn lock(&self) -> MutexGuard<T> {
lock_contended(&self.state);
MutexGuard { mutex: self }
}
}

fn lock_contended(state: &AtomicU32) {
if state.compare_exchange(0, 1, Acquire, Relaxed).is_err() {
while state.swap(2, Acquire) != 0 {
wait(&state, 2)
}
}
}

impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
if self.mutex.state.swap(0, Release) == 2 {
println!("wake_one");
wake_one(&self.mutex.state);
}
}
}

lock():

  • 如果成功将 state0 变换为 1,则说明当前线程抢锁成功,直接返回 MutexGuard
  • 如果失败了,就将 state 置为 2,然后调用 wait 进入休眠。

unlock():

  • state 置为 0,如果之前是 2 的话,那就说明有线程被阻塞着,这个时候才调用 wake_one 唤醒一个阻塞的线程。

这个地方,笔者觉得有一些问题, state.swap(2, Acquire) 这一行代码会无条件将 state 置为 2,也就是说,当这个线程抢到锁后,它在 unlock() 的时候,无论有没有在阻塞的线程,这个时候 state 都是 2,所以都会调用 wake_one

sequenceDiagram
    participant A as 线程A (持有锁)
    participant B as 线程B (尝试获锁)
    participant State as Mutex State
    participant System as 系统调用

    Note over State: state = 1 (线程A持有锁)

    B->>State: compare_exchange(0, 1)
    State-->>B: 失败 (返回 1)

    Note over B: 进入 while 循环
    B->>State: swap(2)
    Note over State: state: 1 → 2
    State-->>B: 返回 1 (≠ 0)

    B->>System: wait(&state, 2)
    Note over B: 线程B进入等待状态

    Note over A: 线程A释放锁
    A->>State: swap(0)
    Note over State: state: 2 → 0
    State-->>A: 返回 2

    A->>System: wake_one()
    Note over System: 正确的唤醒!线程B确实在等待

    Note over B: 线程B被唤醒,继续循环
    B->>State: swap(2)
    Note over State: state: 0 → 2
    State-->>B: 返回 0,退出循环

    Note over B: 获得锁,但state=2 (问题所在)
    Note over B: 使用锁...

    B->>State: unlock() - swap(0)
    Note over State: state: 2 → 0
    State-->>B: 返回 2

    B->>System: wake_one()
    Note over System: 不必要的调用!此时没有等待者

我们可以运行上面的测试用例 cross_thread_should_work,可以看到输出了 2 个 wake_one,但是通过分析,应该只需要调用一次 wake_one 就足够了。

1
2
3
4
5
6
7
test mutex::tests::cross_thread_should_work ... ok

successes:

---- mutex::tests::cross_thread_should_work stdout ----
wake_one
wake_one

笔者的实现如下:

1
2
3
4
5
6
7
8
fn lock_contended(state: &AtomicU32) {
while let Err(s) = state.compare_exchange(0, 1, Acquire, Relaxed) {
if s == 1 {
_ = state.compare_exchange(1, 2, Acquire, Relaxed);
}
wait(&state, 2)
}
}
  1. 我们获取 state.compare_exchange(0,1) 的返回值,如果成功,说明抢到锁,直接返回。
  2. 如果失败了:
    • 原始值是 1,那我们就尝试将 state1 交换为 2,然后调用 wait 陷入休眠。
    • 原始值是 2,说明已经有别的线程也被阻塞了,这个时候直接调用 wait 陷入休眠。
  3. 当被 wake_one 唤醒时,重新执行 state.compare_exchange(0,1) 抢占锁。

这个新的流程中,我们抢到锁的时候,state 会被正确的设置为 1 而不是 2,这个时候,在 drop 的时候就不会有不必要的 wake_one 的调用了。

sequenceDiagram
    participant A as 线程A (持有锁)
    participant B as 线程B (尝试获锁)
    participant State as Mutex State
    participant System as 系统调用

    Note over State: state = 1 (线程A持有锁)

    B->>State: compare_exchange(0, 1)
    State-->>B: 失败,返回 s=1

    Note over B: s == 1,尝试设置等待者标志
    B->>State: compare_exchange(1, 2)
    Note over State: state: 1 → 2
    State-->>B: 成功

    B->>System: wait(&state, 2)
    Note over B: 线程B进入等待状态

    Note over A: 线程A完成工作,释放锁
    A->>State: unlock() - swap(0)
    Note over State: state: 2 → 0
    State-->>A: 返回 2

    A->>System: wake_one()
    Note over System: 正确唤醒线程B

    Note over B: 线程B被唤醒,重新尝试获取锁
    B->>State: compare_exchange(0, 1)
    Note over State: state: 0 → 1 (关键!)
    State-->>B: 成功!退出循环

    Note over B: 🎯 获得锁,state=1 (正确状态)
    Note over B: 使用锁进行工作...

    B->>State: unlock() - swap(0)
    Note over State: state: 1 → 0
    State-->>B: 返回 1 (不是2!)

    Note over B: ✅ 返回值是1,不调用wake_one
    Note over System: 🎯 避免了不必要的系统调用

我们重新运行上面的测试用例 cross_thread_should_work,可以看到只输出了 1 个 wake_one:

1
2
3
4
5
6
test mutex::tests::cross_thread_should_work ... ok

successes:

---- mutex::tests::cross_thread_should_work stdout ----
wake_one

不过笔者在做 benchmark 后发现书中的实现性能其实更高,在 macbook m2max 机器上,书中的版本要比我的版本快 5~10% 左右,猜测大概率是 swap 的性能要比 compare_exchange 高。

v3:短暂自旋进一步避免系统调用

还有一种潜在的优化是,我们可以在抢锁失败且返回 state1 的时候,进行短暂的自旋,如果实际场景中占用锁的时间非常短,那我们就可以再省略一次 wake 的系统调用了。

不过值得注意的是,这种优化未必是正向的,一方面,如果锁占用时间比较长,那前面的自旋就白白浪费了,另一方面,自旋的次数带来的性能消耗,未必就比系统调用要小(不同的平台表现可能很不一样)。

书中给出的经验值是自选 100 次。

优化后的 lock_contended 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fn lock_contended(state: &AtomicU32) {
let mut spin_count = 0;
while let Err(s) = state.compare_exchange(0, 1, Acquire, Relaxed) {
if s == 1 {
if spin_count < 100 {
spin_count += 1;
std::hint::spin_loop();
continue;
}
_ = state.compare_exchange(1, 2, Acquire, Relaxed);
}
wait(&state, 2)
}
}

感兴趣的读者可以使用 criterion 做一个 benchmark 看看自旋与之前的版本的性能差异有多少。

完整代码可参考:conutils/mutex

总结

在本篇中,我们从零开始,结合 Rust 原子操作和内存顺序的核心知识,实现一个 Rust 中的 Mutex 锁,逐步揭示 Mutex 背后的等待与唤醒机制,为更好理解标准库中的 Mutex 奠定了良好的基础。下篇,我们将尝试手写一个条件变量 Condition Variable

Happy Coding! Peace~