系列文章:
继上篇 Rust
实战丨手写一个 oneshot channel,本篇我们继续参考 Rust Atomics and Locks
一书,来实现一个 Arc
。
Arc 简介
Arc
(Atomic Reference Counted)是 Rust
标准库里位于 std::sync
模块中的智能指针,用于
在多个线程之间安全地共享只读数据。和只适用于单线程场景的
Rc<T>
不同,Arc<T>
的引用计数增减操作使用原子指令,从而保证跨线程的内存安全。
为什么需要 Arc?
1 | let p = Person { age: 18, name: "hedon".to_string(), address: "China".to_string() }; |
问题:thread::spawn
要求
'static
生命周期,但 &p
只是栈上变量的借用。
解决:使用 Arc::new(p)
把数据移到堆上,通过原子引用计数实现多所有权:
1 | let p = Arc::new(Person { /* ... */ }); |
读完本篇你能学到什么
- 原子操作的内存顺序选择:通过引用计数这个具体例子,理解
Relaxed / Acquire / Release / fence
的使用场景。 - 弱引用解决循环引用:
Weak<T>
的设计原理和在图结构中的应用。 - Arc::get_mut 的安全性保证:理解「非原子两步校验」的巧妙设计。
- 零成本抽象的实现细节:
UnsafeCell
+ManuallyDrop
相比Option<T>
的优势。
v0: 基础引用计数
数据结构
我们先来分析一下基本的数据结构该如何定义,在之前的 Rust
实战丨手写一个 SpinLock,我们讲到了 C++/Rust 常用的并发编程方式
RAII(Resource Acquisition Is
Initialization,资源获取即初始化),其核心思想是:在对象构造函数中获取资源,在析构函数中释放资源。上面的案例中,我们在初始化
Person
的时候其实也是应用了这种思路。
1 | let p = Arc::new(Person { |
所以我们的自定义 Arc
需要泛型
T
,使其可以承载不同的数据类型的数据
data
,同时为了做引用计数,它需要一个数值类型
ref_count
来计数,为了并发安全,我们可以选择原子类型
AtomicUsize
。
在 Arc
中,我们需要自己管理 data
的生命周期,除了使用裸指针 *mut T
或 *const T
之外,我们可以使用 std::ptr::NonNull<T>
,它是一个零成本、保证非空、支持协变(可安全向子类型转换)的裸指针包装器,具体可参考附录
1. NonNull<T>。
综上,我们可以定义以下数据结构:
1 | struct ArcData<T> { |
- 我们定义了
ArcData
和Arc
两个结构,其中ArcData
包含引用计数ref_count
和实际数据data
。 - 在
Arc
中,我们使用NonNull
来管理ArcData
的生命周期。初始化时,先用Box::new()
在堆上分配内存,再通过Box::leak()
放弃Box<T>
的所有权,交由Arc
自行管理。
graph TB subgraph Stack ["栈内存 Stack"] ArcStruct["Arc<T>
ptr: NonNull<ArcData<T>>"] end subgraph Heap ["堆内存 Heap"] ArcDataStruct["ArcData<T>
ref_count: AtomicUsize(1)
data: T"] end ArcStruct -->|指向| ArcDataStruct subgraph Process ["内存分配过程"] Step1["Box::new(ArcData)"] Step2["Box::leak()"] Step3["NonNull::from()"] Step1 --> Step2 Step2 --> Step3 end classDef stackStyle fill:#e1f5fe,stroke:#01579b,stroke-width:2px classDef heapStyle fill:#f3e5f5,stroke:#4a148c,stroke-width:2px classDef processStyle fill:#fff3e0,stroke:#e65100,stroke-width:2px class ArcStruct stackStyle class ArcDataStruct heapStyle class Step1,Step2,Step3 processStyle
另外,跨线程发送 Arc<T>
会导致 T
对象被共享,即 T
需要满足 Sync
trait,而跨线程发送 Arc<T>
也会导致需要由另外一个线程来释放 T
,所以需要 T
满足 Send
trait。所以只有当 T
满足
Send+Sync
的时候,Arc<T>
才是
Send
的,对于 Sync
也是同理,因为我们可以为
Arc<T>
分别实现 Sync
和
Send
trait:
1 | unsafe impl<T: Send + Sync> Send for Arc<T> {} |
为了能够便捷的获取 data
,我们先为
Arc<T>
实现一个 data()
用于获取
ArcData<T>
,同时为其实现 Deref
trait,用于像指针一样无感操作 data: T
:
1 | impl<T> Arc<T> { |
维护引用计数
基础部分我们已经铺垫完了,现在我们需要来实现 2 个最重要的 trait 了:
- Clone:
Arc
引用计数的关键,在每次clone()
的时候,我们不拷贝data
,而是让ref_count
自增,进行引用计数。 - Drop: 在
Arc<T>
实例离开作用域的时候,我们需要让ref_count
自减,同时在最后一个Arc<T>
被销毁时,我们需要主动释放ArcData<T>
的内存资源。
我们先来实现 Clone
:
1 | impl<T> Clone for Arc<T> { |
因为这里没有其他原子操作需要跟当前操作建立严格的
happens-before
关系,所以这里我们可以使用最松的
Relaxed
内存顺序。
接下来看下 Drop
:
1 | impl <T> Drop for Arc<T> { |
对于
Drop
,我们就需要好好想一想需要什么内存顺序约束了。在最后一个
drop
的时候,我们有 2 个目标:
- 不能过早释放:确保在引用计数减到 0 并销毁对象之前,没有别的线程仍在使用这份数据;
- 要看得见别人写的东西:如果别的线程在它们各自的
drop
里面对共享对象做了写入,最后一个线程做析构时必须"看到"这些写入,否则就可能出现数据竞争或次序错误。
换言之,我们需要最后一个 fetch_sub
跟前面其他每一个
fetch_sub
都建立起 happens before 关系,也即我们需要一对
Release 和 Acquire 来保证 happens-before。
这里简单回顾一下 Release
和
Acquire
,不熟悉的读者可以参阅:Rust 原理丨聊一聊
Rust 的 Atomic 和内存顺序。
Release
: 作用于写操作(store),确保该操作之前的所有内存访问不会被重排到这个 Release 操作之后。Acquire
: 作用于读操作(load),确保该操作之后的所有内存访问不会被重排到这个 Acquire 操作之前。- 当一个线程通过
Acquire
读取到另一个线程通过Release
写入的值时,会建立一个 happens-before 关系:线程 A 中 Release 写入之前的所有内存写操作,对于线程 B 中 Acquire 读取之后的所有内存读操作都是可见的。
Release-Sequence 概念补充:当多个线程对同一个原子变量执行 Release 操作时,这些操作会形成一个 "release-sequence"。后续任何一个 Acquire 操作读取到这个序列中的任意值,都能与整个序列建立 happens-before 关系。这正是为什么我们的
fence(Acquire)
能够与之前所有线程的fetch_sub(Release)
形成同步的关键。
我们当然可以使用 Release
和 Acquire
的结合体 AcqRel
来一步到位解决这个问题。不过考虑到只有最后一个 drop
需要满足这个关系,我们可以尝试做得更优雅一些。
在这种仅需在临界值保证 happens-before
的场景下,我们都可以单独在临界情况下使用一个 fence
来建立起
happens-before。
具体来说:
- 对于非最终
drop
:我们只需要使用Release
,即fetch_sub(1, Ordering::Release)
,它保证了别的线程如果最终做"最后一次 drop",只要它对相同原子执行一条 Acquire 操作,它就能同步到前面所有线程对数据做过的改动。 - 对于最终
drop
:在调用fetch_sub
的时候我们仍需要Release
语义,但是调用时,我们并不知道自己是不是"最后那个线程"。当fetch_sub
返回1
的时候,说明我们是"最后那个线程"。这个时候,我们需要建立一个fetch(Acquire)
,与之前的所有Release
形成配对,确保看到之前所有的历史写入,这个时候我们才能确定已经没有别的线程在使用数据了,我们才可以安全地销毁对象。
如下图所示;
sequenceDiagram participant A as Thread A participant B as Thread B participant C as Thread C
(最后 drop) %% 普通写入 A->>A: write shared data … B->>B: write shared data … %% 非最终 drop:Release 写 A->>A: fetch_sub (Release) ⬅ 计数 n→n-1 B->>B: fetch_sub (Release) ⬅ 计数 n-1→n-2 %% 形成 release-sequence Note over A,B: 这两次 Release 写组成
同一条 release-sequence %% 最终 drop:Release 写 & 返回 1 C->>C: fetch_sub (Release) ⬅ 返回 1 → 计数 0 %% Acquire fence 同步 C-->>C: fence (Acquire)
【接收 release-sequence】 %% 安全析构 C->>C: drop(Box::from_raw) %% 结果说明 Note over C: fence (Acquire) 使 A/B
对共享数据的写必定
在析构前可见
经过这么一顿分析后,我们最终的 Drop
实现如下:
1 | impl<T> Drop for Arc<T> { |
实现逻辑:
- 所有 drop 都用
Release
语义减引用计数,为可能的"最后一次 drop"做准备 - 只有最后一次 drop(返回值为 1)才需要额外的
fence(Acquire)
与之前所有的 Release 建立 happens-before - 安全析构:现在可以确保看到所有历史写入,没人再持有引用
完整代码
自此,我们第一个版本的 Arc
就实现完毕了,我们来看一下最终完成的代码:
1 | use std::sync::atomic::{AtomicUsize, Ordering, fence}; |
单元测试
写个单元测试验证一下:
1 |
|
执行结果是 ok 的:
1 | running 1 test |
v1: 实现 get_mut 获取可变引用
按照前面版本的实现,我们是不能为 Arc<T>
实现
DerefMut
trait 的,因为我们不能无条件地提供
&mut T
可变引用,因为它可能同时被其他的
Arc<T>
所访问中。
不过,当满足一定条件的时候,我们还是可以提供 &mut T
可变引用的。具体来说,需要满足 2 个条件:
- 使用
&mut Arc<T>
保证当前Arc<T>
只有一个地方在使用; - 需要确保全局只有一个
Arc<T>
。
为了避免跟 DerefMut
混淆,我们将其声明为一个静态方法,并将 Arc<T>
作为参数,同时因为只有满足特定条件的情况下,才能返回
&mut T
,所以我们使用 Option
作为返回值。如下:
1 | pub fn get_mut(arc: &mut Arc<T>) -> Option<&mut T> { |
这里我们在确定只有一个
Arc
实例的时候,使用fetch(Acquire)
来跟所有其他Arc
执行drop()
的fetch_sub(Release)
建立 happens-before 关系。因为此时
ref_count=1
,说明仅有当前Arc
这一个实例,而get_mut
需要的又是独占引用,所以当前Arc
不可能再被拿去做clone()
操作,所以在这个情况下,是可以保证有且仅有一个Arc
实例,所以我们是可以安全返回&mut T
的。返回的
&mut T
会隐式地借用参数&mut Arc<T>
的生命周期,而 Rust 不允许可变引用和只读引用交叉存在,所以当前仅剩的这个Arc
,直到&mut T
作用域结束之前,都是不可用的,所以在那之前,不会再被拿去执行clone()
和Deref
等操作,所以是安全的。1
2
3
4
5
6
7
8
9
fn get_mut_should_be_safe() {
let mut x = Arc::new(vec![]);
let v = Arc::get_mut(&mut x).unwrap();
v.push(1);
let y = x.clone(); // <---- cannot borrow `x` as immutable
v.push(2);
}如上面这个例子,就会报错:
1
2
3
4
5
6
7
8
9
10error[E0502]: cannot borrow `x` as immutable because it is also borrowed as mutable
src/arc.rs:117:17
|
114 | let v = Arc::get_mut(&mut x).unwrap();
| ------ mutable borrow occurs here
...
117 | let y = x.clone();
| ^ immutable borrow occurs here
118 | v.push(2);
| - mutable borrow later used here
v2: 弱指针 Weak<T> 解决循环引用
循环引用问题分析
graph TD A --> B; A --> C;
引用计数在各种数据结构的表示中非常有用,如上图所示的树形结构,当释放 A 的时候,因为 B 和 C 不再被引用,所以也可以顺带释放了。
但是,如果 B 和 C 也持有对 A 的引用,即形成了循环引用,那按照我们之前的实现,A、B、C 都将永远不会被释放了,因为它们的引用计数永不为 0,哪怕它们三者均不再被使用。如下图所示:
graph TD A <--> B; A <--> C; B -.-> A; C -.-> A;
为了应对这种场景,Rust
的标准库中提出的解决方案是:Weak<T>
,也称弱指针。T
可以在 Arc<T>
和 Weak<T>
之间共享,当所有的 Arc<T>
都失效的时候,T
被释放,无论此时是否有
Weak<T>
的存在。
如下图所示,父节点对子节点使用的是强引用,确保了父节点存活的时候,子节点都存在。而子节点对父节点使用的是弱引用,只即保留了子节点回溯父节点的能力,也不会阻止父节点的释放。
当父节点被释放时,所有强引用计数归零,节点可以依次被释放。
graph TD A -- Arc --> B; A -- Arc --> C; B -.->|Weak| A; C -.->|Weak| A;
数据结构
OK,做了这么多铺垫后,我们来思考一下现在的 ArcData
该如何调整。
- 之前我们用
ref_count
做引用计数,它代表的都是强引用,现在我们需要记录弱引用数量的相关字段。 - 当只有弱引用的时候,
data
就已经被释放了,我们需要使用None
来表示这种情况,所以data
应该是一个Option<T>
类型。 - 当
ArcData<T>
被一个Arc
和多个Weak
共享时,释放最后一个Arc
时,我们仅拥有&ArcData<T>
不可变引用,这个时候我们需要将其从Some(T)
置为None
,即要在不可变引用上实现修改,这就涉及到了前几篇提到的:内部可变性。UnsafeCell 是 Rust 提供的一个内部可变性工具类型,它包装一个数据,使得即使在只有不可变引用的情况下也可以进行修改(当然需要在unsafe
块中操作)。所以我们需要将data
再用UnsafeCell
包一层,以满足此场景的需求。
综上,最新的 ArcData
定义如下:
1 | struct ArcData<T> { |
然后我们需要定义一个新结构 Weak<T>
,用来表示弱引用,假定这个时候,我们将维护 ArcData
存活的职责交给 Weak<T>
,那就可以将
ArcData
转给 Weak<T>
持有,然后在
Arc<T>
中持有一个 Weak<T>
:
1 | struct ArcData<T> { |
在上述代码中,除了数据结构之外,我们做了 3 点调整:
- 只需要为
Weak<T>
实现Sync
和Send
trait,这个时候Arc<T>
就会被自动实现这 2 个 triat。 data(&self)
辅助函数,移到了Weak<T>
身上。Arc<T>
要从Deref
获取&T
,需要先经过一道Weak<T>
。
graph LR subgraph Stack ["栈内存 Stack"] ArcStruct["Arc<T>
weak: Weak<T>"] WeakStruct["Weak<T>
ptr: NonNull<ArcData<T>>"] end subgraph Heap ["堆内存 Heap"] ArcDataStruct["ArcData<T>
data_ref_count: AtomicUsize(1)
alloc_ref_count: AtomicUsize(1)
data: UnsafeCell<Option<T>>"] end ArcStruct -->|包含| WeakStruct WeakStruct -->|指向| ArcDataStruct classDef stackStyle fill:#e1f5fe,stroke:#01579b,stroke-width:2px classDef heapStyle fill:#f3e5f5,stroke:#4a148c,stroke-width:2px classDef processStyle fill:#fff3e0,stroke:#e65100,stroke-width:2px classDef refStyle fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px class ArcStruct,WeakStruct stackStyle class ArcDataStruct heapStyle class Step1,Step2,Step3,Step4,Step5 processStyle class DataRef,AllocRef,DataContent refStyle
维护引用计数
现在重点来了,我们需要来思考 Weak<T>
和
Arc<T>
的 Clone
和 Drop
该如何实现。其实重点就是该如何管理 alloc_ref_count
和 data_ref_count
的计数。
再次明确下我们的定义:
data_ref_count
: 代表的是强引用Arc<T>
的数量。alloc_ref_count
: 代表的是Arc<T>
+Weak<T>
的数量。
因此:
- Clone:
- 当
Weak<T>
拷贝时,仅增加了弱引用的数量,所以我们只需对alloc_ref_count
进行自增。 - 当
Arc<T>
拷贝时,不仅需要拷贝内部的Weak<T>
,还增加了强引用的数量,所以我们还需要对data_ref_count
进行自增。
- 当
- Drop:
- 当
Arc<T>
被释放时,不仅释放了其内部的Weak<T>
,还减少了一个强引用,所以需要对data_ref_count
进行自减。另外,如果是最后一个Arc<T>
被释放,我们需要将ArcData<T>.data
置为None
。 - 当
Weak<T>
被释放时,我们仅需减少弱引用的数量,即对alloc_ref_count
进行自减。另外,当最后一个Weak<T>
被释放时(此时肯定也没有Arc<T>
了),我们还需要负责释放ArcData<T>
。
- 当
综上,我们的实现如下:
1 | impl<T> Clone for Weak<T> { |
这里考虑到实际场景中的内存限制,我们对引用计数进行了简单的限制,当其超过
usize::Max/2
的时候,就执行std::process:abort()
让整个进程崩溃。
get_mut()
接下来我们来考虑下 get_mut(arc: &mut Arc<T>)
该如何修改。什么时候可以返回
&mut T
,很明显,当且仅当只有一个
Arc<T>
的时候才可以。
故 get_mut
的实现修改如下:
1 | pub fn get_mut(arc: &mut Arc<T>) -> Option<&mut T> { |
到这里,我们执行之前的 arc_should_work
测试用例,可以发现是顺利通过的。
dowgrade()
回顾下面这张图,为了提供 A 的 Weak<T>
指针,我们需要给 Arc<T>
提供一个
downgrade()
方法,用于将强引用降为弱引用,这是必然可以成功的。同时,当我们也可以为
Weak<T>
提供一个 upgrade()
方法,用于持有弱引用的情况下可以尝试访问数据,当然这未必能成功。
graph TD A -- Arc --> B; A -- Arc --> C; B -.->|Weak| A; C -.->|Weak| A;
downgrade()
比较简单,我们直接 clone()
一个
Weak<T>
就可以了:
1 | impl<T> Arc<T> { |
upgrade()
upgrade()
就比较复杂了,只有当存在 Arc
的时候,data
才没被释放,这个时候,才能返回升级后的
Arc<T>
,即要求 data_ref_count>0
。
1 | impl<T> Weak<T> { |
我们来写一下测试用例,验证使用了 Weak<T>
后,我们的资源能否正确释放。在这之前,我们先写一个非
Weak<T>
版本的,看看资源是否真的没有被释放:
1 |
|
执行上述用例后,我们可以发现 NUM_DROPS
还是 0,即
root/child1/child2
均没有被释放资源。
你可以将 parent
的引用修改为弱引用,然后再重新执行测试用例,就会发现
root/child1/child2
的资源都被正确释放了。
1 | struct Node { |
v3: 分离强弱引用,避免无用消耗
上个版本我们通过引入了弱引用
Weak<T>
,成功解决了循环引用的问题,这是个非常大的进步!
不过我们仍然有进一步优化的空间,可以观察到,Arc<T>
的每次拷贝,都会伴随一次拷贝
Weak<T>
,但是,很多时候,我们其实没有循环引用关系的,也即我们并不是每一次都需要
Weak<T>
,所以上个版本的实现,其实是有很多的浪费的。
数据结构
所以我们可以考虑将 Weak<T>
从
Arc<T>
中分离出来:
1 | pub struct Arc<T> { |
同时我们需要对 ArcData<T>
结构中的引用技术进行重新定义:
1 | struct ArcData<T> { |
我们总共做了 3 个重要的调整:
Arc<T>
不再包含Weak<T>
,而是各自独立,不过它们之前会共享底层的ArcData<T>
。alloc_ref_count
的语义,从"Arc + Weak 的数量",变成"Weak 的数量,当存在 Arc 时,额外加 1"。换言之,对于所有的
Arc<T>
,都共有一个隐式的Weak<T>
,当释放最后一个Arc<T>
的时候,这个隐式的Weak<T>
也需要被释放(本质是执行 alloc_ref_count 减一,同时如果没有其他的Weak<T>
,需要顺带释放ArcData<T>
。data
字段,我们使用ManuallyDrop
来替代Option
,我们之前使用None
来表示数据已经被释放,但其实data_ref_count
已经能表达这层意思了,所以我们这里使用ManuallyDrop
来进一步节省内存资源。
std::mem::ManuallyDrop<T>
是一个零成本(zero-cost)包装器。它不改变T
的布局,也不会产生额外的字节,在一些情况下,可以比Option<T>
少一些标记位和字节对齐所占据的额外空间。具体可参考:附录 2. ManuallyDrop<T>。
graph TB subgraph Stack ["栈内存 Stack"] ArcStruct["Arc<T>
ptr: NonNull<ArcData<T>>"] WeakStruct["Weak<T>
ptr: NonNull<ArcData<T>>"] end subgraph Heap ["堆内存 Heap"] ArcDataStruct["ArcData<T>
data_ref_count
alloc_ref_count
data"] end ArcStruct -->|直接指向| ArcDataStruct WeakStruct -->|直接指向| ArcDataStruct classDef stackStyle fill:#e1f5fe,stroke:#01579b,stroke-width:2px classDef heapStyle fill:#f3e5f5,stroke:#4a148c,stroke-width:2px class ArcStruct,WeakStruct stackStyle class ArcDataStruct heapStyle
维护引用计数
修改了引用计数的语义后,我们需要重新思考如何管理引用计数。
- Clone:
- 当
Weak<T>
拷贝时,仅增加了弱引用的数量,所以我们依旧只需对alloc_ref_count
进行自增。 - 当
Arc<T>
拷贝时,这个时候,我们就只需要对data_ref_count
进行自增即可。
- 当
- Drop:
- 当
Arc<T>
被释放时,我们需要对data_ref_count
进行自减。另外,如果是最后一个Arc<T>
被释放,我们需要释放data
,同时,我们还需要释放那个代表所有Arc
的隐式Weak
。 - 当
Weak<T>
被释放时,我们仅需减少弱引用的数量,即对alloc_ref_count
进行自减。另外,当最后一个Weak<T>
被释放时,我们还需要负责释放ArcData<T>
。
- 当
具体的实现如下:
1 | impl<T> Clone for Weak<T> { |
对于这个隐式 Weak<T>
,如果你一时无法很好地 Get
到那个点,可以尝试手动写写整个实现的过程,相信多走几遍流程,你会有那种茅塞顿开的感觉!
flowchart TD subgraph Clone["Clone 操作"] WeakClone["Weak<T>::clone()"] ArcClone["Arc<T>::clone()"] WeakClone --> WeakInc["alloc_ref_count.fetch_add(1)"] ArcClone --> ArcInc["data_ref_count.fetch_add(1)"] WeakInc --> WeakCheck{"计数 > usize::MAX/2?"} ArcInc --> ArcCheck{"计数 > usize::MAX/2?"} WeakCheck -->|是| Abort1["std::process::abort()"] ArcCheck -->|是| Abort2["std::process::abort()"] WeakCheck -->|否| WeakNew["返回新的 Weak<T>"] ArcCheck -->|否| ArcNew["返回新的 Arc<T>"] end subgraph Drop["Drop 操作"] WeakDrop["Weak<T>::drop()"] ArcDrop["Arc<T>::drop()"] WeakDrop --> WeakDec["alloc_ref_count.fetch_sub(1, Release)"] ArcDrop --> ArcDec["data_ref_count.fetch_sub(1, Release)"] WeakDec --> WeakDropCheck{"返回值 == 1?
最后一个 Weak?"} ArcDec --> ArcDropCheck{"返回值 == 1?
最后一个 Arc?"} WeakDropCheck -->|是| WeakFence["fence(Acquire)"] WeakDropCheck -->|否| WeakEnd["结束"] ArcDropCheck -->|是| ArcFence["fence(Acquire)"] ArcDropCheck -->|否| ArcEnd["结束"] WeakFence --> WeakFree["释放 ArcData<T>
Box::from_raw(ptr)"] ArcFence --> ArcDataFree["释放 data
ManuallyDrop::drop()"] ArcDataFree --> ImplicitWeak["释放隐式 Weak
drop(Weak { ptr })"] ImplicitWeak --> WeakDrop end subgraph Memory["内存释放顺序"] Step1["① Arc 释放 data 内容"] Step2["② Arc 释放隐式 Weak"] Step3["③ Weak 释放 ArcData 结构"] Step1 --> Step2 Step2 --> Step3 end classDef cloneStyle fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px classDef dropStyle fill:#ffebee,stroke:#c62828,stroke-width:2px classDef memStyle fill:#fff3e0,stroke:#e65100,stroke-width:2px classDef criticalStyle fill:#fce4ec,stroke:#880e4f,stroke-width:3px class WeakClone,ArcClone,WeakInc,ArcInc,WeakNew,ArcNew cloneStyle class WeakDrop,ArcDrop,WeakDec,ArcDec,WeakEnd,ArcEnd dropStyle class WeakFree,ArcDataFree,ImplicitWeak criticalStyle class Step1,Step2,Step3 memStyle
至此,我们重新运行之前的测试用例
arc_should_work()
,可以发现的成功通过的。
下面我们继续来调整 get_mut
、downgrade()
和
upgrade()
。
upgrade()
upgrade()
比较简单,它的规则还是不变,只有当存在
Arc<T>
时,即 data_ref_count>0
时,才能升级成功:
1 | impl<T> Weak<T> { |
get_mut()
接一下我们看
get_mut
,规则也是不变:当前仅当只有一个
Arc<T>
,且不存在 Weak<T>
时,才可以返回可变引用。
1 | // 两个条件 |
代码的逻辑就不赘述了,但是这里有 3 个原子操作的内存顺序我们需要讨论一下!
分别是:
1 | // 获取 Weak<T> 的数量并暂时锁定 |
1 | // 获取 Arc<T> 的数量 |
1 | // 释放 alloc_ref_count |
首先看第 1 个,我们要判断当前是否已经没有 Weak<T>
了,所以我们需要保证看到之前所有 drop(Weak<T>)
的写入操作,所以这里需要建立起一个 happens-before,因此我们之前为
Weak<T>
实现 Drop
trait 的时候,使用的是
Release
,而这里,需要使用 Acquire
来进行配对,建立 happens-before。
然后看第 2 个,这里我们要判断是否仅有一个
Arc<T>
,所以我们需要保证看到之前所有
drop<Arc<T>
的写入操作,因此我们之前为
Arc<T>
实现 Drop
trait 的时候,使用的是
Release
。但是在这里,我们仅需在仅剩 1 个 Arc
的时候,才有必要建立 happens-before,所以当 is_unique=true
时,我们补一个 fence(Acquire)
屏障,来建立跟
drop(Arc<T>)
的 happens-before。
在分析第 3 个之前,我们需要来尝试挖一下当前 get_mut
的漏洞!相信有部分读者在看到上述实现后,跟笔者一样,会有一个疑惑:上述
2
个条件的检查,并不是原子的,这样的检查还安全可靠吗?
比如说:
1 | pub fn get_mut(arc: &mut Arc<T>) -> Option<&mut T> { |
在上面这个例子中:
- 假设我们已经通过了第一个检查;
- 在进行第二个检查之前的这个间隙中,存在另外一个
Arc<T>
; - 然后它通过
downgrade()
生成了一个Weak<T>
; - 然后再
drop(Arc<T>)
; - 这个时候我们再检查
Arc<T>
的时候,会发现只有 1 个,检查就通过了。但是其实这个时候,存在了其他的Weak<T>
,所以是有问题的!
sequenceDiagram participant A as T1 :get_mut() participant B as T2 :downgrade()+drop(Arc) participant C as ArcData A->>C: CAS alloc_ref 1→MAX ✔ Note over A,B: 非原子窗口 B->>C: clone Weak
alloc_ref++ (Relaxed) B->>C: drop(Arc)
fetch_sub data_ref (Release) A->>C: load data_ref ==1 ✔ A->>C: store alloc_ref MAX→1 (Release) A-->>A: fence(Acquire) → 返回 &mut T(已失效)
所以:我们不能让
downgrade()
在这个间隙中成功执行!这也是我们在前面将
alloc_ref_count
置为
usize::Max
(上锁)的原因,在后面的 downgrade()
中,我们肯定要检查这个值,如果
alloc_ref_count=usize::Max
,就不能 downgrade()
成功,直到 alloc_ref_store(1, _)
的时候,才能
downgrade()
成功。不过这个时候已经晚了!如果是这样,那
is_unique
肯定就不为 true
,所以会返回
None
,这个时候,我们不会返回
&mut T
,所以,危机就解除了!
downgrade()
我们先来看一下 downgrade()
的实现:
1 | impl<T> Arc<T> { |
在 downgrade()
的实现中,我们跟 get_mut
遥相呼应:
如果
alloc_ref_count=usize::MAX
,说明被锁住,这个时候需要自旋等待并重试。std::hint::spin_loop()
会向 CPU 发送特定指令(如 x86 的 pause 或 ARM 的 yield),提示当前处于忙等待状态,有利于优化 CPU 行为。如果
alloc_ref_count
没被锁住,我们尝试进行 CAS,这里成功的时候使用的Acquire
,为什么呢?这是为了跟前面还未讨论的第 3 个原子操作建立 happens-before!
现在我们终于可以来解开这第 3 个原子操作的原子顺序谜团了:
1 | // 释放 alloc_ref_count |
这里我们必须保证跟 download()
的
compare_exchange(_,_,Acquire,_)
建立起 happens-before
关系,即将 alloc_ref_count
置为 1 的结果必须被
downgrade()
所在的线程看到。不然的话,这里
compare_exchange
成功了,将 alloc_ref_count
置为 2 了,但是 get_mut
又将其置为 1 了,就乱套了!
完整代码
到这里我们终于是完成了 v3 版本的优化工作了,真棒!介于篇幅已经够长了,这里就不再贴出完整的代码了,感兴趣的读者可以参阅:hedon-rust-road/conutils/arc。
浅探标准库的 Arc<T>
在最后,我们来看一下标准库的 Arc<T>
是如何实现的,看文章开头说的实现一个可以媲美标准库的
Arc<T>
是不是在吹牛!
标准库(rustc 1.87.0)的 Arc<T>
位于 sync.rs
文件中,我们来看下它的数据结构定义:
1 | pub struct Arc< |
其他一些莫名其妙的标记咱就不管了,映入眼帘可以看到一个
ArcInner
,这不就是咱的 ArcData
吗!点进去看下:
1 | struct ArcInner<T: ?Sized> { |
好家伙!这不就是咱的 ArcData
!
strong
: 对应我们的data_ref_count
weak
: 对应我们的alloc_ref_count
weak
字段上面的注释也揭示了其核心逻辑跟咱是高度一致的!
我们简单看下最重要的 Drop
和 Clone
的实现,因为这涉及到引用计数的维护:
1 | const MAX_REFCOUNT: usize = (isize::MAX) as usize; |
可以看到跟咱前面的实现是一样一样的!为啥?因为 Rust Atomics and Locks 一书的作者Mara Bos 就是 Rust 标准库团队的领导之一!牛逼!
总结
通过三轮迭代,我们不仅实现了一个媲美了标准库的
Arc<T>
,还更进一步体验了 Rust
并发抽象的设计哲学:
- v0 —— 能用:单计数
ArcData<T>
解决了跨线程多所有权,但仍缺乏可变视图与断环能力。 - v1 —— 能改:借助独占
&mut Arc<T>
+ 原子 锁,在 不破坏并发安全 的前提下开放了get_mut
。 - v2 —— 能回收:双计数 +
Weak<T>
消除了父子互指造成的泄漏,展示了 弱引用 在所有权图中的价值。 - v3 —— 更轻巧:将
Weak
独立、用ManuallyDrop
替换Option<T>
,让没有循环引用需求的场景不再为弱引用买单。
下篇我们将尝试实现一个 Mutex<T>
,敬请期待!
Happy Coding! Peace~
附录
1. NonNull<T>
std::ptr::NonNull<T>
是一个
零成本、非空、协变
的裸指针包装器。它本质上只是把一个原始指针塞进
#[repr(transparent)]
的新类型中,但强制保证
绝不为空,因此可以拿到「空值当作枚举判别位」这份额外信息
—— Option<NonNull<T>>
与一个普通指针占用同样大小。
重要特性:
- 永远非空:创建时若传入空指针即触发
未定义行为。编译器可依赖这一点做优化,例如把
Option<NonNull<T>>
合并为一个指针宽度。 - 协变:与
*mut T
不同,NonNull<T>
可以在U: Deref<Target = T>
的场景下安全向子类型转换(因为禁止空值带来了额外保证),这使它非常适合构建自定义智能指针。 - 无自动 Drop:
NonNull
只存地址,不持有所有权;销毁与释放内存仍由外部逻辑决定(如Box::from_raw
、Vec::dealloc
等)。
我们将其与裸指针、 Box<T>
智能指针和引用
&T
做一个简单的比较:
特性 | *mut T / *const T |
NonNull<T> |
Box<T> / &T |
---|---|---|---|
是否允许为空 | ✅ | ❌ 必须非空 | 不适用 |
协变性 | *mut 不协变 |
协变(可安全向子类型转换) | &T 协变 |
Option 优化 | ❌ 多 1 B 判别字节 | ✅ 与裸指针同尺寸 | 已自带优化 |
所有权 / drop 责任 | 没有 | 没有(纯指针) | 有 |
Send / Sync | 与 T 无关 |
默认 !Send !Sync |
取决于 T |
常见用途 | FFI、底层算法 | 智能指针内部、侵入式容器、裁掉空判 | 高层所有权模型 |
关键的 API:
分组 | 代表方法 | 说明 |
---|---|---|
构造 | NonNull::new(ptr) →
Option<NonNull<T>>
unsafe { NonNull::new_unchecked(ptr) }
NonNull::dangling() |
安全 / 不安全 / 占位 |
引用视图 | unsafe fn as_ref /
as_mut |
把裸指针临时借用成 &T /
&mut T |
裸指针互转 | fn as_ptr |
取回 *mut T ,解引用仍需 unsafe |
类型转换 | fn cast<U> fn cast_mut /
cast_const |
保留地址,换类型或可变性 |
地址运算 | unsafe fn add / byte_add / sub / offset |
指针算术,与 ptr::add 族一致 |
切片助手 | NonNull::slice_from_raw_parts(data, len)
fn len() |
1.70+ 稳定,为 [T] 提供非空裸切片构造 (rustwiki.org) |
⚠️ 任何把
NonNull
重新解释为引用或解引用的操作,都必须在unsafe
块里手动保证内存有效性与别名规则。
典型使用场景:
场景 | 作用 |
---|---|
智能指针内部实现
(Box /Rc /Arc ) |
需要 非空 原始指针存放被管对象,且要在 Option
等场景下节省空间 |
自研侵入式链表 / 红黑树 | 节点自身持有前后指针字段
NonNull<Node<T>> ,天然避免空判分支 |
惰性初始化 / Vec::new |
先用 NonNull::dangling()
占位,等真正分配后再写入正确地址 |
FFI | C API 明确保证参数永不为空时,用 NonNull
在类型层面表达前置条件 |
自引用结构(Pin 工作区) | 在完成真正初始化前使用 NonNull::dangling()
保存指向自身字段的指针 |
2. ManuallyDrop<T>
std::mem::ManuallyDrop<T>
是一个零成本(zero-cost)包装器。把值包在 ManuallyDrop
里会告诉编译器:请不要在作用域结束时自动调用它的
Drop
实现,什么时候释放(或是否释放)由我手动决定。
ManuallyDrop<T>
只是把 "是否自动 drop" 这一语义从编译器搬到了程序员身上;- 它不改变
T
的布局,也不会产生额外的字节,因此是零开销; - unsafe 责任:你必须保证一份值恰好析构一次(不能漏掉,也不能多调)。
它的核心 API 如表所示:
API | 作用 | 重要注意点 |
---|---|---|
ManuallyDrop::new(value) |
把 T 包装成
ManuallyDrop<T> ,关闭自动析构 |
零开销;之后需显式 drop 或移动走 |
ManuallyDrop::drop(&mut self)
(unsafe) |
手动触发内部值的析构 | 必须保证之后不会再访问 / 再 drop |
ManuallyDrop::take(&mut self)
(unsafe) |
从包装里"搬走"值(等价于 ptr::read ) |
v 里留下未初始化内存,不能再用或再 drop |
ManuallyDrop::into_inner(self)
(unsafe) |
消费 ManuallyDrop 返回内部值 |
取得所有权后,原包装已被移走(不会 double-drop) |