系列文章:
- Rust 原理丨聊一聊 Rust 的 Atomic 和内存顺序
- Rust 原理丨从汇编角度看原子操作
- Rust 实战丨手写一个 SpinLock
- Rust 实战丨手写一个 oneshot channel
- Rust 实战丨手写一个 Arc
- Rust 原理丨操作系统并发原语
- Rust 实战丨手写一个 Mutex
- Rust 实战丨手写一个 Condvar 👈 本篇
- Rust 实战丨手写一个 RwLock
本篇我们继续参考 Rust
Atomics and Locks 一书来手写一个条件变量:Condition
Variable,简称 Condvar
。
在本章开始之前,我们假设你已经:
- 熟悉并理解 Rust 的各种原子操作。
- 阅读过 Rust 原理丨聊一聊 Rust 的 Atomic 和内存顺序 和 Rust 原理丨从汇编角度看原子操作,并理解内存顺序和内存屏障的原理和使用方法。
- 理解 Rust
UnsafeCell<T>
提供的内部可变性允许我们在持有共享引用&
的时候可以对数据进行修改。 - 阅读过 Rust
原理丨操作系统并发原语 并了解 atomic-wait crate 中
wait/wake_one/wake_all
的适用场景和使用方法。
在 Rust 中,Condvar
是一个配合 Mutex
使用的线程同步原语,主要作用是让线程在满足某些“条件”之前主动睡眠(阻塞),待条件达成时再被唤醒。
典型的就是生产者和消费者模式,我们先来看一下标准库的
Condvar
如何使用:
1 |
|
在上面这个例子中,我们实现了一对生产者和消费者:
- 消费尝试获取锁,并从队列中获取数据:
- 如果有,则释放锁并返回。
- 如果没有,则调用条件变量的
wait
陷入阻塞并释放锁。
- 生产者尝试获取锁,并往队列中投放数据,并调用条件变量的
notify_one
唤醒潜在的阻塞线程。
flowchart LR A[生产者] -->|添加数据| B[队列] B -->|取数据| C[消费者] A -->|notify_one| C C -->|wait| A
读完本篇你能学到什么
你是否在多线程编程中遇到过这些令人头疼的问题:
🤔 性能浪费问题:消费者线程需要不断轮询检查数据是否就绪,即使没有数据也要持续占用 CPU,这种"忙等待"让程序效率低下?
🤔 复杂的同步逻辑:在生产者-消费者模式中,如何让消费者在没有数据时优雅地进入休眠,而不是无休止地检查?
🤔 竞态条件的困扰:如何确保在多线程环境下,唤醒操作不会丢失,线程不会因丢失唤醒而永远沉睡?
🤔 性能优化的疑惑:系统调用开销很大,如何避免在没有等待线程时进行无意义的唤醒操作?
🤔 内存顺序的选择:在实现同步原语时,到底该用
Acquire
、Release
还是
Relaxed
?如何分析 happens-before 关系?
如果这些问题曾经让你困惑,那么本文正是为你准备的。下面我们就正式开始从零开始构建一个条件变量(Condvar),用最直观的方式解答这些并发编程中的经典难题。
v1:基础实现
先来思考一下如何定义 Condvar
这个数据结构,参考标准库,它会有 3 个方法:
wait(MutexGuard)
: 释放 MutexGuard 并陷入等待。notify_one()
: 唤醒一个wait
的线程。notify_all()
: 唤醒所有wait
的线程。
看过本系列前面几篇的读者应该可以敏锐觉察到,这里就是对应了
atomic-wait
中的 wait/wake_one/wake_all
。
那局势就比较明朗了,我们可以在 condvar.wait(guard)
的时候调用 atomic_wait::wait(&atomic)
,然后在
condvar.notify_one()
的时候修改 &atomic
然后调用 atomic_wait::wake_one()
唤醒线程,condvar.notify_all()
也同理。
因此 Condvar
需要有一个 AtomicU32
类型的属性,这里我们称为 counter
。故 Condvar
结构暂且定义如下:
1 | pub struct Condvar { |
notify_one
和 notify_all
所上所述,就非常简单了:
1 | impl Condvar { |
现在就剩下 wait
了,它的基本原理:
- 接收一个 MutexGuard;
- 释放 MutexGuard;
- 陷入等待,等待唤醒;
- 被唤醒后,再次抢占锁。
综上,我们可以有以下实现:
1 | pub struct MutexGuard<'a, T> { |
注意,为了访问
guard.mutex
,这里我们使用的是之前自己手写的 MutexGuard,并将
mutex
字段的私有程度修改为
pub(crate)
。代码比较简单,这里就不赘述了,完整流程可参考下图理解。
我们修改一下测试用例,运行后发现也是可以通过的!
1 |
|
v2:减少不必要的系统调用
第 1 个版本中,我们在 notify_one
和
notify_all
分别都无条件调用了 wake_one
和
wake_all
尝试唤醒潜在的线程,但是这个时候可能并没有线程被阻塞着,那这个系统调用就白白浪费了。
所以在这个版本中,我们尝试来优化这一点。为此,我们需要记录当前阻塞中的线程的数量,所以需要给
Condvar
加一个属性 num_waiters
:
1 | pub struct Condvar { |
在 notify_one
和 notify_all
的时候,我们仅当 num_waiters>0
的时候,才进行系统调用:
1 | impl Condvar { |
在 wait
的时候,我们先标记自己是等待的,即
num_waiters++
,然后在被唤醒后,解除这个标记,即
num_waiters--
:
1 | impl Condvar { |
OK,这里又到了最关键的问题了:操作 num_waiters 时该用什么内存顺序?
这个关键的问题的关键是什么呢?是要确定哪些地方需要建立 happens-before 关系!
很明显,我们这里的关键就是要防止
wake_one
的丢失,即确保如果一个线程即将进入等待状态,那么后续的通知操作能够看到这个等待者的存在。所以这里我们需要
notify_one()
中的 load
和
condvar.wait()
中的 fetch_add
建立
happens-before 关系。至于 fetch_sub
就无所谓了,因为这个时候已经被唤醒了,丢失或者重复唤醒都无所谓了。
不过这里其实可以省掉这对 Release
和
Acquire
,直接用 Relaxed
!为什么呢?
在
condvar.wait
的fetch_add
之前,我们必须先拿到 MutexGuard,即通过lock()
抢占到锁,lock()
里面是啥操作?是一个Acquire
!1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24impl<T> Mutex<T> {
// ...
pub fn lock(&self) -> MutexGuard<T> {
lock_contended(&self.state);
// Swap successfully, means locked.
MutexGuard { mutex: self }
}
}
fn lock_contended(state: &AtomicU32) {
let mut spin_count = 0;
while let Err(s) = state.compare_exchange(0, 1, Ordering::Acquire, Ordering::Relaxed) {
if s == 1 {
if spin_count < 100 {
spin_count += 1;
std::hint::spin_loop();
continue;
}
_ = state.compare_exchange(1, 2, Ordering::Acquire, Ordering::Relaxed);
}
wait(state, 2)
}
}我们在调用
atomic-wait::wait
陷入等待之前,要先drop(guard
),别忘了,drop(guard)
里面是啥操作?是一个Release
!1
2
3
4
5
6
7
8impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) {
// If there are threads waiting for the lock, wait one of them.
if self.mutex.state.swap(0, Ordering::Release) == 2 {
wake_one(&self.mutex.state);
}
}
}
所以呀,这里其实天然就已经有一对 Release
和
Acquire
了!happens-before
关系是成立的!所以我们之前的代码就已经满足要求了,再次运行前面的测试用例,依旧是顺利通过的!
完整代码可参考:conutils/condvar。具体流程你可以参考下图辅助理解。
sequenceDiagram participant Consumer as 消费者线程 participant Producer as 生产者线程 participant Mutex as Mutex状态 participant NumWaiters as num_waiters participant Counter as counter Consumer->>Mutex: lock() [Acquire] Consumer->>Consumer: 检查条件,发现需要等待 Consumer->>NumWaiters: fetch_add(1) [Relaxed] Consumer->>Counter: load() -> counter_value Consumer->>Mutex: drop(guard) [Release] Consumer->>Counter: wait(counter_value) Note over Producer: 生产者在另一个线程 Producer->>Mutex: lock() [Acquire] ✅ 与Consumer的Release同步 Producer->>Producer: 修改共享数据 Producer->>Mutex: drop(guard) [Release] Producer->>NumWaiters: load() > 0? ✅ 看到Consumer的increment Producer->>Counter: fetch_add(1) [Relaxed] Producer->>Counter: wake_one() Consumer->>Consumer: 被唤醒 Consumer->>Mutex: lock() [Acquire] Consumer->>NumWaiters: fetch_sub(1) [Relaxed]
另外,即使
notify_one()
在wait()
之前调用,atomic_wait::wait()
的语义也能保证正确性。因为wait(&counter, expected_value)
只有在counter
的值等于expected_value
时才会阻塞,如果counter
已经被修改,wait
会立即返回。
总结
通过本文的学习,我们从零开始实现了一个功能完整的条件变量(Condvar),并在这个过程中解决了多个重要问题:
理解条件变量的本质:Condvar 本质上是一个配合 Mutex 使用的线程同步工具,它解决了"如何让线程在条件不满足时休眠,条件满足时被唤醒"这一经典并发编程问题。
掌握两种实现策略:
- v1 基础版本:直接使用
atomic-wait
实现等待与唤醒机制 - v2 优化版本:通过
num_waiters
计数器避免不必要的系统调用
- v1 基础版本:直接使用
深入理解内存顺序:通过分析 happens-before 关系,我们发现可以使用
Relaxed
内存顺序,因为 Mutex 的Release
/Acquire
操作已经提供了必要的同步保障。
掌握了这些知识后,你可以:
- 在生产者-消费者场景中高效地同步线程
- 理解标准库
std::sync::Condvar
的实现原理 - 在设计自己的同步原语时做出正确的内存顺序选择
- 识别并避免并发编程中的常见陷阱
条件变量虽然概念简单,但其背后涉及的原子操作、内存顺序、操作系统原语等知识却相当深入。通过亲手实现,我们不仅掌握了工具的使用,更重要的是理解了其背后的设计思想,这为我们后续学习更复杂的并发编程技巧打下了坚实基础。
下篇,我们将完成 Rust Atomics and Locks 的最后一个实战案例:手写一个读写锁(RwLock)!
Happy Coding! Peace~