系列文章:


本篇我们继续参考 Rust Atomics and Locks 一书来手写一个条件变量:Condition Variable,简称 Condvar

在本章开始之前,我们假设你已经:

  1. 熟悉并理解 Rust 的各种原子操作。
  2. 阅读过 Rust 原理丨聊一聊 Rust 的 Atomic 和内存顺序Rust 原理丨从汇编角度看原子操作,并理解内存顺序和内存屏障的原理和使用方法。
  3. 理解 Rust UnsafeCell<T> 提供的内部可变性允许我们在持有共享引用 & 的时候可以对数据进行修改。
  4. 阅读过 Rust 原理丨操作系统并发原语 并了解 atomic-wait crate 中 wait/wake_one/wake_all 的适用场景和使用方法。

在 Rust 中,Condvar 是一个配合 Mutex 使用的线程同步原语,主要作用是让线程在满足某些“条件”之前主动睡眠(阻塞),待条件达成时再被唤醒。

典型的就是生产者和消费者模式,我们先来看一下标准库的 Condvar 如何使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#[cfg(test)]
mod tests {
use std::{
collections::VecDeque,
sync::{Condvar, Mutex},
thread,
time::Duration,
};

#[test]
fn condvar_usage() {
let queue = Mutex::new(VecDeque::new());
let not_empty = Condvar::new();

thread::scope(|s| {
// 消费者
s.spawn(|| loop {
let mut q = queue.lock().unwrap();
let item = loop {
if let Some(item) = q.pop_front() { // 从队列中获取数据
break item; // 获取到则返回
} else {
q = not_empty.wait(q).unwrap(); // 获取不到数据则阻塞等待
}
};
drop(q);
dbg!(item);
});

// 生产者
for i in 0..10 {
queue.lock().unwrap().push_back(i); // 往队列里面投放数据
not_empty.notify_one(); // 唤醒潜在的阻塞线程
thread::sleep(Duration::from_secs(1));
}
});
}
}

在上面这个例子中,我们实现了一对生产者和消费者:

  1. 消费尝试获取锁,并从队列中获取数据:
    1. 如果有,则释放锁并返回。
    2. 如果没有,则调用条件变量的 wait 陷入阻塞并释放锁
  2. 生产者尝试获取锁,并往队列中投放数据,并调用条件变量的 notify_one 唤醒潜在的阻塞线程。
flowchart LR
    A[生产者] -->|添加数据| B[队列]
    B -->|取数据| C[消费者]
    A -->|notify_one| C
    C -->|wait| A

读完本篇你能学到什么

你是否在多线程编程中遇到过这些令人头疼的问题:

🤔 性能浪费问题:消费者线程需要不断轮询检查数据是否就绪,即使没有数据也要持续占用 CPU,这种"忙等待"让程序效率低下?

🤔 复杂的同步逻辑:在生产者-消费者模式中,如何让消费者在没有数据时优雅地进入休眠,而不是无休止地检查?

🤔 竞态条件的困扰:如何确保在多线程环境下,唤醒操作不会丢失,线程不会因丢失唤醒而永远沉睡?

🤔 性能优化的疑惑:系统调用开销很大,如何避免在没有等待线程时进行无意义的唤醒操作?

🤔 内存顺序的选择:在实现同步原语时,到底该用 AcquireRelease 还是 Relaxed?如何分析 happens-before 关系?

如果这些问题曾经让你困惑,那么本文正是为你准备的。下面我们就正式开始从零开始构建一个条件变量(Condvar),用最直观的方式解答这些并发编程中的经典难题。

v1:基础实现

先来思考一下如何定义 Condvar 这个数据结构,参考标准库,它会有 3 个方法:

  • wait(MutexGuard): 释放 MutexGuard 并陷入等待。
  • notify_one(): 唤醒一个 wait 的线程。
  • notify_all(): 唤醒所有 wait 的线程。

看过本系列前面几篇的读者应该可以敏锐觉察到,这里就是对应了 atomic-wait 中的 wait/wake_one/wake_all

那局势就比较明朗了,我们可以在 condvar.wait(guard) 的时候调用 atomic_wait::wait(&atomic) ,然后在 condvar.notify_one() 的时候修改 &atomic 然后调用 atomic_wait::wake_one() 唤醒线程,condvar.notify_all() 也同理。

因此 Condvar 需要有一个 AtomicU32 类型的属性,这里我们称为 counter。故 Condvar 结构暂且定义如下:

1
2
3
4
5
6
7
8
9
10
11
pub struct Condvar {
counter: AtomicU32,
}

impl Condvar {
pub fn new() -> Self {
Self {
counter: AtomicU32::new(0),
}
}
}

notify_onenotify_all 所上所述,就非常简单了:

1
2
3
4
5
6
7
8
9
10
11
12
13
impl Condvar {
// ...

pub fn notify_one(&self) {
self.counter.fetch_add(1, Relaxed);
wake_one(&self.counter);
}

pub fn notify_all(&self) {
self.counter.fetch_add(1, Relaxed);
wake_all(&self.counter);
}
}

现在就剩下 wait 了,它的基本原理:

  1. 接收一个 MutexGuard;
  2. 释放 MutexGuard;
  3. 陷入等待,等待唤醒;
  4. 被唤醒后,再次抢占锁。

综上,我们可以有以下实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
pub struct MutexGuard<'a, T> {
pub(crate) mutex: &'a Mutex<T>, // <---- 需要公开 mutex 字段,这里使用 pub(crate) 限制 crate 外部访问
}

impl Condvar {
//...

pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
let counter_value = self.counter.load(Relaxed);

// Unlock the mutex by dropping the guard,
// but remember the mutex so we can lock it again.
let mutex = guard.mutex;
drop(guard);

// Wait, but only if the counter hasn't changed since unlocking.
wait(&self.counter, counter_value);

mutex.lock()
}
}

注意,为了访问 guard.mutex,这里我们使用的是之前自己手写的 MutexGuard,并将 mutex 字段的私有程度修改为 pub(crate)。代码比较简单,这里就不赘述了,完整流程可参考下图理解。

我们修改一下测试用例,运行后发现也是可以通过的!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#[cfg(test)]
mod tests {
use std::{collections::VecDeque, thread, time::Duration};

use crate::{condvar::Condvar, mutex::Mutex};

#[test]
fn condvar_usage() {
let queue = Mutex::new(VecDeque::new());
let not_empty = Condvar::new();

thread::scope(|s| {
s.spawn(|| loop {
let mut q = queue.lock();
let item = loop {
if let Some(item) = q.pop_front() {
break item;
} else {
q = not_empty.wait(q);
}
};
drop(q);
dbg!(item);
});

for i in 0..10 {
queue.lock().push_back(i);
not_empty.notify_one();
thread::sleep(Duration::from_secs(1));
}
});
}
}

v2:减少不必要的系统调用

第 1 个版本中,我们在 notify_onenotify_all 分别都无条件调用了 wake_onewake_all 尝试唤醒潜在的线程,但是这个时候可能并没有线程被阻塞着,那这个系统调用就白白浪费了。

所以在这个版本中,我们尝试来优化这一点。为此,我们需要记录当前阻塞中的线程的数量,所以需要给 Condvar 加一个属性 num_waiters

1
2
3
4
5
6
7
8
9
10
11
12
13
pub struct Condvar {
counter: AtomicU32,
num_waiters: AtomicUsize,
}

impl Condvar {
pub fn new() -> Self {
Self {
counter: AtomicU32::new(0),
num_waiters: AtomicUsize::new(0),
}
}
}

notify_onenotify_all 的时候,我们仅当 num_waiters>0 的时候,才进行系统调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
impl Condvar {
// ...

pub fn notify_one(&self) {
if self.num_waiters.load(Relaxed) > 0 {
self.counter.fetch_add(1, Relaxed); // TODO: memory order
wake_one(&self.counter);
}
}

pub fn notify_all(&self) {
if self.num_waiters.load(Relaxed) > 0 {
self.counter.fetch_add(1, Relaxed); // TODO: memory order
wake_all(&self.counter);
}
}
}

wait 的时候,我们先标记自己是等待的,即 num_waiters++,然后在被唤醒后,解除这个标记,即 num_waiters--

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
impl Condvar {
// ...

pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
self.num_waiters.fetch_add(1, Relaxed); // TODO: memory order <---- New!!!

let counter_value = self.counter.load(Relaxed);

// Unlock the mutex by dropping the guard,
// but remember the mutex so we can lock it again.
let mutex = guard.mutex;
drop(guard);

// Wait, but only if the counter hasn't changed since unlocking.
wait(&self.counter, counter_value);

self.num_waiters.fetch_sub(1, Relaxed); //TODO: memory order <---- New!!!

mutex.lock()
}
}

OK,这里又到了最关键的问题了:操作 num_waiters 时该用什么内存顺序?

这个关键的问题的关键是什么呢?是要确定哪些地方需要建立 happens-before 关系!

很明显,我们这里的关键就是要防止 wake_one 的丢失,即确保如果一个线程即将进入等待状态,那么后续的通知操作能够看到这个等待者的存在。所以这里我们需要 notify_one() 中的 loadcondvar.wait() 中的 fetch_add 建立 happens-before 关系。至于 fetch_sub 就无所谓了,因为这个时候已经被唤醒了,丢失或者重复唤醒都无所谓了。

不过这里其实可以省掉这对 ReleaseAcquire,直接用 Relaxed!为什么呢?

  1. condvar.waitfetch_add 之前,我们必须先拿到 MutexGuard,即通过 lock() 抢占到锁,lock() 里面是啥操作?是一个 Acquire!

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    impl<T> Mutex<T> {
    // ...

    pub fn lock(&self) -> MutexGuard<T> {
    lock_contended(&self.state);
    // Swap successfully, means locked.
    MutexGuard { mutex: self }
    }
    }

    fn lock_contended(state: &AtomicU32) {
    let mut spin_count = 0;
    while let Err(s) = state.compare_exchange(0, 1, Ordering::Acquire, Ordering::Relaxed) {
    if s == 1 {
    if spin_count < 100 {
    spin_count += 1;
    std::hint::spin_loop();
    continue;
    }
    _ = state.compare_exchange(1, 2, Ordering::Acquire, Ordering::Relaxed);
    }
    wait(state, 2)
    }
    }
  2. 我们在调用 atomic-wait::wait 陷入等待之前,要先 drop(guard),别忘了,drop(guard) 里面是啥操作?是一个 Release

    1
    2
    3
    4
    5
    6
    7
    8
    impl<T> Drop for MutexGuard<'_, T> {
    fn drop(&mut self) {
    // If there are threads waiting for the lock, wait one of them.
    if self.mutex.state.swap(0, Ordering::Release) == 2 {
    wake_one(&self.mutex.state);
    }
    }
    }

所以呀,这里其实天然就已经有一对 ReleaseAcquire 了!happens-before 关系是成立的!所以我们之前的代码就已经满足要求了,再次运行前面的测试用例,依旧是顺利通过的!

完整代码可参考:conutils/condvar。具体流程你可以参考下图辅助理解。

sequenceDiagram
    participant Consumer as 消费者线程
    participant Producer as 生产者线程
    participant Mutex as Mutex状态
    participant NumWaiters as num_waiters
    participant Counter as counter

    Consumer->>Mutex: lock() [Acquire]
    Consumer->>Consumer: 检查条件,发现需要等待
    Consumer->>NumWaiters: fetch_add(1) [Relaxed]
    Consumer->>Counter: load() -> counter_value
    Consumer->>Mutex: drop(guard) [Release]
    Consumer->>Counter: wait(counter_value)

    Note over Producer: 生产者在另一个线程
    Producer->>Mutex: lock() [Acquire] ✅ 与Consumer的Release同步
    Producer->>Producer: 修改共享数据
    Producer->>Mutex: drop(guard) [Release]
    Producer->>NumWaiters: load() > 0? ✅ 看到Consumer的increment
    Producer->>Counter: fetch_add(1) [Relaxed]
    Producer->>Counter: wake_one()

    Consumer->>Consumer: 被唤醒
    Consumer->>Mutex: lock() [Acquire]
    Consumer->>NumWaiters: fetch_sub(1) [Relaxed]

另外,即使 notify_one()wait() 之前调用,atomic_wait::wait() 的语义也能保证正确性。因为 wait(&counter, expected_value) 只有在 counter 的值等于 expected_value 时才会阻塞,如果 counter 已经被修改,wait 会立即返回。

总结

通过本文的学习,我们从零开始实现了一个功能完整的条件变量(Condvar),并在这个过程中解决了多个重要问题:

  1. 理解条件变量的本质:Condvar 本质上是一个配合 Mutex 使用的线程同步工具,它解决了"如何让线程在条件不满足时休眠,条件满足时被唤醒"这一经典并发编程问题。

  2. 掌握两种实现策略

    • v1 基础版本:直接使用 atomic-wait 实现等待与唤醒机制
    • v2 优化版本:通过 num_waiters 计数器避免不必要的系统调用
  3. 深入理解内存顺序:通过分析 happens-before 关系,我们发现可以使用 Relaxed 内存顺序,因为 Mutex 的 Release/Acquire 操作已经提供了必要的同步保障。

掌握了这些知识后,你可以:

  • 在生产者-消费者场景中高效地同步线程
  • 理解标准库 std::sync::Condvar 的实现原理
  • 在设计自己的同步原语时做出正确的内存顺序选择
  • 识别并避免并发编程中的常见陷阱

条件变量虽然概念简单,但其背后涉及的原子操作、内存顺序、操作系统原语等知识却相当深入。通过亲手实现,我们不仅掌握了工具的使用,更重要的是理解了其背后的设计思想,这为我们后续学习更复杂的并发编程技巧打下了坚实基础。

下篇,我们将完成 Rust Atomics and Locks 的最后一个实战案例:手写一个读写锁(RwLock)!

Happy Coding! Peace~