系列文章:
- Rust 原理丨聊一聊 Rust 的 Atomic 和内存顺序
- Rust 原理丨从汇编角度看原子操作
- Rust 实战丨手写一个 SpinLock
- Rust 实战丨手写一个 oneshot channel
- Rust 实战丨手写一个 Arc 👈 本篇
- Rust 原理丨操作系统并发原语
- Rust 实战丨手写一个 Mutex
- Rust 实战丨手写一个 Condvar
- Rust 实战丨手写一个 RwLock
继上篇 Rust
实战丨手写一个 oneshot channel,本篇我们继续参考 Rust Atomics and Locks
一书,来实现一个 Arc。
在本章开始之前,我们假设你已经:
- 熟悉并理解 Rust 的各种原子操作。
- 阅读过 Rust 原理丨聊一聊 Rust 的 Atomic 和内存顺序,并理解内存顺序和内存屏障的原理和使用方法。
- 理解 Rust
UnsafeCell<T>提供的内部可变性允许我们在持有共享引用&的时候可以对数据进行修改。
Arc 简介
Arc(Atomic Reference Counted)是 Rust
标准库里位于 std::sync 模块中的智能指针,用于
在多个线程之间安全地共享只读数据。和只适用于单线程场景的
Rc<T> 不同,Arc<T>
的引用计数增减操作使用原子指令,从而保证跨线程的内存安全。
为什么需要 Arc?
1 | let p = Person { age: 18, name: "hedon".to_string(), address: "China".to_string() }; |
问题:thread::spawn 要求
'static 生命周期,但 &p
只是栈上变量的借用。
解决:使用 Arc::new(p)
把数据移到堆上,通过原子引用计数实现多所有权:
1 | let p = Arc::new(Person { /* ... */ }); |
读完本篇你能学到什么
- 原子操作的内存顺序选择:通过引用计数这个具体例子,理解
Relaxed / Acquire / Release / fence的使用场景。 - 弱引用解决循环引用:
Weak<T>的设计原理和在图结构中的应用。 - Arc::get_mut 的安全性保证:理解「非原子两步校验」的巧妙设计。
- 零成本抽象的实现细节:
UnsafeCell+ManuallyDrop相比Option<T>的优势。
v0: 基础引用计数
数据结构
我们先来分析一下基本的数据结构该如何定义,在之前的 Rust
实战丨手写一个 SpinLock,我们讲到了 C++/Rust 常用的并发编程方式
RAII(Resource Acquisition Is
Initialization,资源获取即初始化),其核心思想是:在对象构造函数中获取资源,在析构函数中释放资源。上面的案例中,我们在初始化
Person 的时候其实也是应用了这种思路。
1 | let p = Arc::new(Person { |
所以我们的自定义 Arc 需要泛型
T,使其可以承载不同的数据类型的数据
data,同时为了做引用计数,它需要一个数值类型
ref_count 来计数,为了并发安全,我们可以选择原子类型
AtomicUsize。
在 Arc 中,我们需要自己管理 data
的生命周期,除了使用裸指针 *mut T 或 *const T
之外,我们可以使用 std::ptr::NonNull<T>
,它是一个零成本、保证非空、支持协变(可安全向子类型转换)的裸指针包装器,具体可参考附录
1. NonNull<T>。
综上,我们可以定义以下数据结构:
1 | struct ArcData<T> { |
- 我们定义了
ArcData和Arc两个结构,其中ArcData包含引用计数ref_count和实际数据data。 - 在
Arc中,我们使用NonNull来管理ArcData的生命周期。初始化时,先用Box::new()在堆上分配内存,再通过Box::leak()放弃Box<T>的所有权,交由Arc自行管理。
graph TB
subgraph Stack ["栈内存 Stack"]
ArcStruct["Arc<T>
ptr: NonNull<ArcData<T>>"]
end
subgraph Heap ["堆内存 Heap"]
ArcDataStruct["ArcData<T>
ref_count: AtomicUsize(1)
data: T"]
end
ArcStruct -->|指向| ArcDataStruct
subgraph Process ["内存分配过程"]
Step1["Box::new(ArcData)"]
Step2["Box::leak()"]
Step3["NonNull::from()"]
Step1 --> Step2
Step2 --> Step3
end
classDef stackStyle fill:#e1f5fe,stroke:#01579b,stroke-width:2px
classDef heapStyle fill:#f3e5f5,stroke:#4a148c,stroke-width:2px
classDef processStyle fill:#fff3e0,stroke:#e65100,stroke-width:2px
class ArcStruct stackStyle
class ArcDataStruct heapStyle
class Step1,Step2,Step3 processStyle
另外,跨线程发送 Arc<T> 会导致 T
对象被共享,即 T 需要满足 Sync
trait,而跨线程发送 Arc<T>
也会导致需要由另外一个线程来释放 T,所以需要 T
满足 Send trait。所以只有当 T 满足
Send+Sync 的时候,Arc<T> 才是
Send 的,对于 Sync 也是同理,因为我们可以为
Arc<T> 分别实现 Sync 和
Send trait:
1 | unsafe impl<T: Send + Sync> Send for Arc<T> {} |
为了能够便捷的获取 data,我们先为
Arc<T> 实现一个 data() 用于获取
ArcData<T>,同时为其实现 Deref
trait,用于像指针一样无感操作 data: T:
1 | impl<T> Arc<T> { |
维护引用计数
基础部分我们已经铺垫完了,现在我们需要来实现 2 个最重要的 trait 了:
- Clone:
Arc引用计数的关键,在每次clone()的时候,我们不拷贝data,而是让ref_count自增,进行引用计数。 - Drop: 在
Arc<T>实例离开作用域的时候,我们需要让ref_count自减,同时在最后一个Arc<T>被销毁时,我们需要主动释放ArcData<T>的内存资源。
我们先来实现 Clone:
1 | impl<T> Clone for Arc<T> { |
因为这里没有其他原子操作需要跟当前操作建立严格的
happens-before 关系,所以这里我们可以使用最松的
Relaxed 内存顺序。
接下来看下 Drop:
1 | impl <T> Drop for Arc<T> { |
对于
Drop,我们就需要好好想一想需要什么内存顺序约束了。在最后一个
drop 的时候,我们有 2 个目标:
- 不能过早释放:确保在引用计数减到 0 并销毁对象之前,没有别的线程仍在使用这份数据;
- 要看得见别人写的东西:如果别的线程在它们各自的
drop里面对共享对象做了写入,最后一个线程做析构时必须"看到"这些写入,否则就可能出现数据竞争或次序错误。
换言之,我们需要最后一个 fetch_sub 跟前面其他每一个
fetch_sub 都建立起 happens before 关系,也即我们需要一对
Release 和 Acquire 来保证 happens-before。
这里简单回顾一下 Release 和
Acquire,不熟悉的读者可以参阅:Rust 原理丨聊一聊
Rust 的 Atomic 和内存顺序。
Release: 作用于写操作(store),确保该操作之前的所有内存访问不会被重排到这个 Release 操作之后。Acquire: 作用于读操作(load),确保该操作之后的所有内存访问不会被重排到这个 Acquire 操作之前。- 当一个线程通过
Acquire读取到另一个线程通过Release写入的值时,会建立一个 happens-before 关系:线程 A 中 Release 写入之前的所有内存写操作,对于线程 B 中 Acquire 读取之后的所有内存读操作都是可见的。
Release-Sequence 概念补充:当多个线程对同一个原子变量执行 Release 操作时,这些操作会形成一个 "release-sequence"。后续任何一个 Acquire 操作读取到这个序列中的任意值,都能与整个序列建立 happens-before 关系。这正是为什么我们的
fence(Acquire)能够与之前所有线程的fetch_sub(Release)形成同步的关键。
我们当然可以使用 Release 和 Acquire
的结合体 AcqRel
来一步到位解决这个问题。不过考虑到只有最后一个 drop
需要满足这个关系,我们可以尝试做得更优雅一些。
在这种仅需在临界值保证 happens-before
的场景下,我们都可以单独在临界情况下使用一个 fence 来建立起
happens-before。
具体来说:
- 对于非最终
drop:我们只需要使用Release,即fetch_sub(1, Ordering::Release),它保证了别的线程如果最终做"最后一次 drop",只要它对相同原子执行一条 Acquire 操作,它就能同步到前面所有线程对数据做过的改动。 - 对于最终
drop:在调用fetch_sub的时候我们仍需要Release语义,但是调用时,我们并不知道自己是不是"最后那个线程"。当fetch_sub返回1的时候,说明我们是"最后那个线程"。这个时候,我们需要建立一个fetch(Acquire),与之前的所有Release形成配对,确保看到之前所有的历史写入,这个时候我们才能确定已经没有别的线程在使用数据了,我们才可以安全地销毁对象。
如下图所示;
sequenceDiagram
participant A as Thread A
participant B as Thread B
participant C as Thread C
(最后 drop)
%% 普通写入
A->>A: write shared data …
B->>B: write shared data …
%% 非最终 drop:Release 写
A->>A: fetch_sub (Release) ⬅ 计数 n→n-1
B->>B: fetch_sub (Release) ⬅ 计数 n-1→n-2
%% 形成 release-sequence
Note over A,B: 这两次 Release 写组成
同一条 release-sequence
%% 最终 drop:Release 写 & 返回 1
C->>C: fetch_sub (Release) ⬅ 返回 1 → 计数 0
%% Acquire fence 同步
C-->>C: fence (Acquire)
【接收 release-sequence】
%% 安全析构
C->>C: drop(Box::from_raw)
%% 结果说明
Note over C: fence (Acquire) 使 A/B
对共享数据的写必定
在析构前可见
经过这么一顿分析后,我们最终的 Drop 实现如下:
1 | impl<T> Drop for Arc<T> { |
实现逻辑:
- 所有 drop 都用
Release语义减引用计数,为可能的"最后一次 drop"做准备 - 只有最后一次 drop(返回值为 1)才需要额外的
fence(Acquire)与之前所有的 Release 建立 happens-before - 安全析构:现在可以确保看到所有历史写入,没人再持有引用
完整代码
自此,我们第一个版本的 Arc
就实现完毕了,我们来看一下最终完成的代码:
1 | use std::sync::atomic::{AtomicUsize, Ordering, fence}; |
单元测试
写个单元测试验证一下:
1 |
|
执行结果是 ok 的:
1 | running 1 test |
v1: 实现 get_mut 获取可变引用
按照前面版本的实现,我们是不能为 Arc<T> 实现
DerefMut trait 的,因为我们不能无条件地提供
&mut T 可变引用,因为它可能同时被其他的
Arc<T> 所访问中。
不过,当满足一定条件的时候,我们还是可以提供 &mut T
可变引用的。具体来说,需要满足 2 个条件:
- 使用
&mut Arc<T>保证当前Arc<T>只有一个地方在使用; - 需要确保全局只有一个
Arc<T>。
为了避免跟 DerefMut
混淆,我们将其声明为一个静态方法,并将 Arc<T>
作为参数,同时因为只有满足特定条件的情况下,才能返回
&mut T,所以我们使用 Option
作为返回值。如下:
1 | pub fn get_mut(arc: &mut Arc<T>) -> Option<&mut T> { |
这里我们在确定只有一个
Arc实例的时候,使用fetch(Acquire)来跟所有其他Arc执行drop()的fetch_sub(Release)建立 happens-before 关系。因为此时
ref_count=1,说明仅有当前Arc这一个实例,而get_mut需要的又是独占引用,所以当前Arc不可能再被拿去做clone()操作,所以在这个情况下,是可以保证有且仅有一个Arc实例,所以我们是可以安全返回&mut T的。返回的
&mut T会隐式地借用参数&mut Arc<T>的生命周期,而 Rust 不允许可变引用和只读引用交叉存在,所以当前仅剩的这个Arc,直到&mut T作用域结束之前,都是不可用的,所以在那之前,不会再被拿去执行clone()和Deref等操作,所以是安全的。1
2
3
4
5
6
7
8
9
fn get_mut_should_be_safe() {
let mut x = Arc::new(vec![]);
let v = Arc::get_mut(&mut x).unwrap();
v.push(1);
let y = x.clone(); // <---- cannot borrow `x` as immutable
v.push(2);
}如上面这个例子,就会报错:
1
2
3
4
5
6
7
8
9
10error[E0502]: cannot borrow `x` as immutable because it is also borrowed as mutable
src/arc.rs:117:17
|
114 | let v = Arc::get_mut(&mut x).unwrap();
| ------ mutable borrow occurs here
...
117 | let y = x.clone();
| ^ immutable borrow occurs here
118 | v.push(2);
| - mutable borrow later used here
v2: 弱指针 Weak<T> 解决循环引用
循环引用问题分析
graph TD
A --> B;
A --> C;
引用计数在各种数据结构的表示中非常有用,如上图所示的树形结构,当释放 A 的时候,因为 B 和 C 不再被引用,所以也可以顺带释放了。
但是,如果 B 和 C 也持有对 A 的引用,即形成了循环引用,那按照我们之前的实现,A、B、C 都将永远不会被释放了,因为它们的引用计数永不为 0,哪怕它们三者均不再被使用。如下图所示:
graph TD
A <--> B;
A <--> C;
B -.-> A;
C -.-> A;
为了应对这种场景,Rust
的标准库中提出的解决方案是:Weak<T>,也称弱指针。T
可以在 Arc<T> 和 Weak<T>
之间共享,当所有的 Arc<T>
都失效的时候,T 被释放,无论此时是否有
Weak<T> 的存在。
如下图所示,父节点对子节点使用的是强引用,确保了父节点存活的时候,子节点都存在。而子节点对父节点使用的是弱引用,只即保留了子节点回溯父节点的能力,也不会阻止父节点的释放。
当父节点被释放时,所有强引用计数归零,节点可以依次被释放。
graph TD
A -- Arc --> B;
A -- Arc --> C;
B -.->|Weak| A;
C -.->|Weak| A;
数据结构
OK,做了这么多铺垫后,我们来思考一下现在的 ArcData
该如何调整。
- 之前我们用
ref_count做引用计数,它代表的都是强引用,现在我们需要记录弱引用数量的相关字段。 - 当只有弱引用的时候,
data就已经被释放了,我们需要使用None来表示这种情况,所以data应该是一个Option<T>类型。 - 当
ArcData<T>被一个Arc和多个Weak共享时,释放最后一个Arc时,我们仅拥有&ArcData<T>不可变引用,这个时候我们需要将其从Some(T)置为None,即要在不可变引用上实现修改,这就涉及到了前几篇提到的:内部可变性。UnsafeCell 是 Rust 提供的一个内部可变性工具类型,它包装一个数据,使得即使在只有不可变引用的情况下也可以进行修改(当然需要在unsafe块中操作)。所以我们需要将data再用UnsafeCell包一层,以满足此场景的需求。
综上,最新的 ArcData 定义如下:
1 | struct ArcData<T> { |
然后我们需要定义一个新结构 Weak<T>
,用来表示弱引用,假定这个时候,我们将维护 ArcData
存活的职责交给 Weak<T>,那就可以将
ArcData 转给 Weak<T> 持有,然后在
Arc<T> 中持有一个 Weak<T>:
1 | struct ArcData<T> { |
在上述代码中,除了数据结构之外,我们做了 3 点调整:
- 只需要为
Weak<T>实现Sync和Sendtrait,这个时候Arc<T>就会被自动实现这 2 个 triat。 data(&self)辅助函数,移到了Weak<T>身上。Arc<T>要从Deref获取&T,需要先经过一道Weak<T>。
graph LR
subgraph Stack ["栈内存 Stack"]
ArcStruct["Arc<T>
weak: Weak<T>"]
WeakStruct["Weak<T>
ptr: NonNull<ArcData<T>>"]
end
subgraph Heap ["堆内存 Heap"]
ArcDataStruct["ArcData<T>
data_ref_count: AtomicUsize(1)
alloc_ref_count: AtomicUsize(1)
data: UnsafeCell<Option<T>>"]
end
ArcStruct -->|包含| WeakStruct
WeakStruct -->|指向| ArcDataStruct
classDef stackStyle fill:#e1f5fe,stroke:#01579b,stroke-width:2px
classDef heapStyle fill:#f3e5f5,stroke:#4a148c,stroke-width:2px
classDef processStyle fill:#fff3e0,stroke:#e65100,stroke-width:2px
classDef refStyle fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
class ArcStruct,WeakStruct stackStyle
class ArcDataStruct heapStyle
class Step1,Step2,Step3,Step4,Step5 processStyle
class DataRef,AllocRef,DataContent refStyle
维护引用计数
现在重点来了,我们需要来思考 Weak<T> 和
Arc<T> 的 Clone 和 Drop
该如何实现。其实重点就是该如何管理 alloc_ref_count
和 data_ref_count 的计数。
再次明确下我们的定义:
data_ref_count: 代表的是强引用Arc<T>的数量。alloc_ref_count: 代表的是Arc<T>+Weak<T>的数量。
因此:
- Clone:
- 当
Weak<T>拷贝时,仅增加了弱引用的数量,所以我们只需对alloc_ref_count进行自增。 - 当
Arc<T>拷贝时,不仅需要拷贝内部的Weak<T>,还增加了强引用的数量,所以我们还需要对data_ref_count进行自增。
- 当
- Drop:
- 当
Arc<T>被释放时,不仅释放了其内部的Weak<T>,还减少了一个强引用,所以需要对data_ref_count进行自减。另外,如果是最后一个Arc<T>被释放,我们需要将ArcData<T>.data置为None。 - 当
Weak<T>被释放时,我们仅需减少弱引用的数量,即对alloc_ref_count进行自减。另外,当最后一个Weak<T>被释放时(此时肯定也没有Arc<T>了),我们还需要负责释放ArcData<T>。
- 当
综上,我们的实现如下:
1 | impl<T> Clone for Weak<T> { |
这里考虑到实际场景中的内存限制,我们对引用计数进行了简单的限制,当其超过
usize::Max/2的时候,就执行std::process:abort()让整个进程崩溃。
get_mut()
接下来我们来考虑下 get_mut(arc: &mut Arc<T>)
该如何修改。什么时候可以返回
&mut T,很明显,当且仅当只有一个
Arc<T> 的时候才可以。
故 get_mut 的实现修改如下:
1 | pub fn get_mut(arc: &mut Arc<T>) -> Option<&mut T> { |
到这里,我们执行之前的 arc_should_work
测试用例,可以发现是顺利通过的。
dowgrade()
回顾下面这张图,为了提供 A 的 Weak<T>
指针,我们需要给 Arc<T> 提供一个
downgrade()
方法,用于将强引用降为弱引用,这是必然可以成功的。同时,当我们也可以为
Weak<T> 提供一个 upgrade()
方法,用于持有弱引用的情况下可以尝试访问数据,当然这未必能成功。
graph TD
A -- Arc --> B;
A -- Arc --> C;
B -.->|Weak| A;
C -.->|Weak| A;
downgrade() 比较简单,我们直接 clone() 一个
Weak<T> 就可以了:
1 | impl<T> Arc<T> { |
upgrade()
upgrade() 就比较复杂了,只有当存在 Arc
的时候,data 才没被释放,这个时候,才能返回升级后的
Arc<T>,即要求 data_ref_count>0。
1 | impl<T> Weak<T> { |
我们来写一下测试用例,验证使用了 Weak<T>
后,我们的资源能否正确释放。在这之前,我们先写一个非
Weak<T> 版本的,看看资源是否真的没有被释放:
1 |
|
执行上述用例后,我们可以发现 NUM_DROPS 还是 0,即
root/child1/child2 均没有被释放资源。
你可以将 parent
的引用修改为弱引用,然后再重新执行测试用例,就会发现
root/child1/child2 的资源都被正确释放了。
1 | struct Node { |
v3: 分离强弱引用,避免无用消耗
上个版本我们通过引入了弱引用
Weak<T>,成功解决了循环引用的问题,这是个非常大的进步!
不过我们仍然有进一步优化的空间,可以观察到,Arc<T>
的每次拷贝,都会伴随一次拷贝
Weak<T>,但是,很多时候,我们其实没有循环引用关系的,也即我们并不是每一次都需要
Weak<T>,所以上个版本的实现,其实是有很多的浪费的。
数据结构
所以我们可以考虑将 Weak<T> 从
Arc<T> 中分离出来:
1 | pub struct Arc<T> { |
同时我们需要对 ArcData<T>
结构中的引用技术进行重新定义:
1 | struct ArcData<T> { |
我们总共做了 3 个重要的调整:
Arc<T>不再包含Weak<T>,而是各自独立,不过它们之前会共享底层的ArcData<T>。alloc_ref_count的语义,从"Arc + Weak 的数量",变成"Weak 的数量,当存在 Arc 时,额外加 1"。换言之,对于所有的
Arc<T>,都共有一个隐式的Weak<T>,当释放最后一个Arc<T>的时候,这个隐式的Weak<T>也需要被释放(本质是执行 alloc_ref_count 减一,同时如果没有其他的Weak<T>,需要顺带释放ArcData<T>。data字段,我们使用ManuallyDrop来替代Option,我们之前使用None来表示数据已经被释放,但其实data_ref_count已经能表达这层意思了,所以我们这里使用ManuallyDrop来进一步节省内存资源。
std::mem::ManuallyDrop<T>是一个零成本(zero-cost)包装器。它不改变T的布局,也不会产生额外的字节,在一些情况下,可以比Option<T>少一些标记位和字节对齐所占据的额外空间。具体可参考:附录 2. ManuallyDrop<T>。
graph TB
subgraph Stack ["栈内存 Stack"]
ArcStruct["Arc<T>
ptr: NonNull<ArcData<T>>"]
WeakStruct["Weak<T>
ptr: NonNull<ArcData<T>>"]
end
subgraph Heap ["堆内存 Heap"]
ArcDataStruct["ArcData<T>
data_ref_count
alloc_ref_count
data"]
end
ArcStruct -->|直接指向| ArcDataStruct
WeakStruct -->|直接指向| ArcDataStruct
classDef stackStyle fill:#e1f5fe,stroke:#01579b,stroke-width:2px
classDef heapStyle fill:#f3e5f5,stroke:#4a148c,stroke-width:2px
class ArcStruct,WeakStruct stackStyle
class ArcDataStruct heapStyle
维护引用计数
修改了引用计数的语义后,我们需要重新思考如何管理引用计数。
- Clone:
- 当
Weak<T>拷贝时,仅增加了弱引用的数量,所以我们依旧只需对alloc_ref_count进行自增。 - 当
Arc<T>拷贝时,这个时候,我们就只需要对data_ref_count进行自增即可。
- 当
- Drop:
- 当
Arc<T>被释放时,我们需要对data_ref_count进行自减。另外,如果是最后一个Arc<T>被释放,我们需要释放data,同时,我们还需要释放那个代表所有Arc的隐式Weak。 - 当
Weak<T>被释放时,我们仅需减少弱引用的数量,即对alloc_ref_count进行自减。另外,当最后一个Weak<T>被释放时,我们还需要负责释放ArcData<T>。
- 当
具体的实现如下:
1 | impl<T> Clone for Weak<T> { |
对于这个隐式 Weak<T>,如果你一时无法很好地 Get
到那个点,可以尝试手动写写整个实现的过程,相信多走几遍流程,你会有那种茅塞顿开的感觉!
flowchart TD
subgraph Clone["Clone 操作"]
WeakClone["Weak<T>::clone()"]
ArcClone["Arc<T>::clone()"]
WeakClone --> WeakInc["alloc_ref_count.fetch_add(1)"]
ArcClone --> ArcInc["data_ref_count.fetch_add(1)"]
WeakInc --> WeakCheck{"计数 > usize::MAX/2?"}
ArcInc --> ArcCheck{"计数 > usize::MAX/2?"}
WeakCheck -->|是| Abort1["std::process::abort()"]
ArcCheck -->|是| Abort2["std::process::abort()"]
WeakCheck -->|否| WeakNew["返回新的 Weak<T>"]
ArcCheck -->|否| ArcNew["返回新的 Arc<T>"]
end
subgraph Drop["Drop 操作"]
WeakDrop["Weak<T>::drop()"]
ArcDrop["Arc<T>::drop()"]
WeakDrop --> WeakDec["alloc_ref_count.fetch_sub(1, Release)"]
ArcDrop --> ArcDec["data_ref_count.fetch_sub(1, Release)"]
WeakDec --> WeakDropCheck{"返回值 == 1?
最后一个 Weak?"}
ArcDec --> ArcDropCheck{"返回值 == 1?
最后一个 Arc?"}
WeakDropCheck -->|是| WeakFence["fence(Acquire)"]
WeakDropCheck -->|否| WeakEnd["结束"]
ArcDropCheck -->|是| ArcFence["fence(Acquire)"]
ArcDropCheck -->|否| ArcEnd["结束"]
WeakFence --> WeakFree["释放 ArcData<T>
Box::from_raw(ptr)"]
ArcFence --> ArcDataFree["释放 data
ManuallyDrop::drop()"]
ArcDataFree --> ImplicitWeak["释放隐式 Weak
drop(Weak { ptr })"]
ImplicitWeak --> WeakDrop
end
subgraph Memory["内存释放顺序"]
Step1["① Arc 释放 data 内容"]
Step2["② Arc 释放隐式 Weak"]
Step3["③ Weak 释放 ArcData 结构"]
Step1 --> Step2
Step2 --> Step3
end
classDef cloneStyle fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
classDef dropStyle fill:#ffebee,stroke:#c62828,stroke-width:2px
classDef memStyle fill:#fff3e0,stroke:#e65100,stroke-width:2px
classDef criticalStyle fill:#fce4ec,stroke:#880e4f,stroke-width:3px
class WeakClone,ArcClone,WeakInc,ArcInc,WeakNew,ArcNew cloneStyle
class WeakDrop,ArcDrop,WeakDec,ArcDec,WeakEnd,ArcEnd dropStyle
class WeakFree,ArcDataFree,ImplicitWeak criticalStyle
class Step1,Step2,Step3 memStyle
至此,我们重新运行之前的测试用例
arc_should_work(),可以发现的成功通过的。
下面我们继续来调整 get_mut、downgrade() 和
upgrade()。
upgrade()
upgrade() 比较简单,它的规则还是不变,只有当存在
Arc<T> 时,即 data_ref_count>0
时,才能升级成功:
1 | impl<T> Weak<T> { |
get_mut()
接一下我们看
get_mut,规则也是不变:当前仅当只有一个
Arc<T>,且不存在 Weak<T>
时,才可以返回可变引用。
1 | // 两个条件 |
代码的逻辑就不赘述了,但是这里有 3 个原子操作的内存顺序我们需要讨论一下!
分别是:
1 | // 获取 Weak<T> 的数量并暂时锁定 |
1 | // 获取 Arc<T> 的数量 |
1 | // 释放 alloc_ref_count |
首先看第 1 个,我们要判断当前是否已经没有 Weak<T>
了,所以我们需要保证看到之前所有 drop(Weak<T>)
的写入操作,所以这里需要建立起一个 happens-before,因此我们之前为
Weak<T> 实现 Drop trait 的时候,使用的是
Release,而这里,需要使用 Acquire
来进行配对,建立 happens-before。
然后看第 2 个,这里我们要判断是否仅有一个
Arc<T>,所以我们需要保证看到之前所有
drop<Arc<T> 的写入操作,因此我们之前为
Arc<T> 实现 Drop trait 的时候,使用的是
Release。但是在这里,我们仅需在仅剩 1 个 Arc
的时候,才有必要建立 happens-before,所以当 is_unique=true
时,我们补一个 fence(Acquire) 屏障,来建立跟
drop(Arc<T>) 的 happens-before。
在分析第 3 个之前,我们需要来尝试挖一下当前 get_mut
的漏洞!相信有部分读者在看到上述实现后,跟笔者一样,会有一个疑惑:上述
2
个条件的检查,并不是原子的,这样的检查还安全可靠吗?
比如说:
1 | pub fn get_mut(arc: &mut Arc<T>) -> Option<&mut T> { |
在上面这个例子中:
- 假设我们已经通过了第一个检查;
- 在进行第二个检查之前的这个间隙中,存在另外一个
Arc<T>; - 然后它通过
downgrade()生成了一个Weak<T>; - 然后再
drop(Arc<T>); - 这个时候我们再检查
Arc<T>的时候,会发现只有 1 个,检查就通过了。但是其实这个时候,存在了其他的Weak<T>,所以是有问题的!
sequenceDiagram
participant A as T1 :get_mut()
participant B as T2 :downgrade()+drop(Arc)
participant C as ArcData
A->>C: CAS alloc_ref 1→MAX ✔
Note over A,B: 非原子窗口
B->>C: clone Weak
alloc_ref++ (Relaxed)
B->>C: drop(Arc)
fetch_sub data_ref (Release)
A->>C: load data_ref ==1 ✔
A->>C: store alloc_ref MAX→1 (Release)
A-->>A: fence(Acquire) → 返回 &mut T(已失效)
所以:我们不能让
downgrade()
在这个间隙中成功执行!这也是我们在前面将
alloc_ref_count 置为
usize::Max(上锁)的原因,在后面的 downgrade()
中,我们肯定要检查这个值,如果
alloc_ref_count=usize::Max,就不能 downgrade()
成功,直到 alloc_ref_store(1, _) 的时候,才能
downgrade() 成功。不过这个时候已经晚了!如果是这样,那
is_unique 肯定就不为 true,所以会返回
None,这个时候,我们不会返回
&mut T,所以,危机就解除了!
downgrade()
我们先来看一下 downgrade() 的实现:
1 | impl<T> Arc<T> { |
在 downgrade() 的实现中,我们跟 get_mut
遥相呼应:
如果
alloc_ref_count=usize::MAX,说明被锁住,这个时候需要自旋等待并重试。std::hint::spin_loop()会向 CPU 发送特定指令(如 x86 的 pause 或 ARM 的 yield),提示当前处于忙等待状态,有利于优化 CPU 行为。如果
alloc_ref_count没被锁住,我们尝试进行 CAS,这里成功的时候使用的Acquire,为什么呢?这是为了跟前面还未讨论的第 3 个原子操作建立 happens-before!
现在我们终于可以来解开这第 3 个原子操作的原子顺序谜团了:
1 | // 释放 alloc_ref_count |
这里我们必须保证跟 download() 的
compare_exchange(_,_,Acquire,_) 建立起 happens-before
关系,即将 alloc_ref_count 置为 1 的结果必须被
downgrade() 所在的线程看到。不然的话,这里
compare_exchange 成功了,将 alloc_ref_count
置为 2 了,但是 get_mut 又将其置为 1 了,就乱套了!
完整代码
到这里我们终于是完成了 v3 版本的优化工作了,真棒!介于篇幅已经够长了,这里就不再贴出完整的代码了,感兴趣的读者可以参阅:hedon-rust-road/conutils/arc。
浅探标准库的 Arc<T>
在最后,我们来看一下标准库的 Arc<T>
是如何实现的,看文章开头说的实现一个可以媲美标准库的
Arc<T> 是不是在吹牛!
标准库(rustc 1.87.0)的 Arc<T> 位于 sync.rs
文件中,我们来看下它的数据结构定义:
1 | pub struct Arc< |
其他一些莫名其妙的标记咱就不管了,映入眼帘可以看到一个
ArcInner,这不就是咱的 ArcData
吗!点进去看下:
1 | struct ArcInner<T: ?Sized> { |
好家伙!这不就是咱的 ArcData !
strong: 对应我们的data_ref_countweak: 对应我们的alloc_ref_count
weak
字段上面的注释也揭示了其核心逻辑跟咱是高度一致的!
我们简单看下最重要的 Drop 和 Clone
的实现,因为这涉及到引用计数的维护:
1 | const MAX_REFCOUNT: usize = (isize::MAX) as usize; |
可以看到跟咱前面的实现是一样一样的!为啥?因为 Rust Atomics and Locks 一书的作者Mara Bos 就是 Rust 标准库团队的领导之一!牛逼!
总结
通过三轮迭代,我们不仅实现了一个媲美了标准库的
Arc<T> ,还更进一步体验了 Rust
并发抽象的设计哲学:
- v0 —— 能用:单计数
ArcData<T>解决了跨线程多所有权,但仍缺乏可变视图与断环能力。 - v1 —— 能改:借助独占
&mut Arc<T>+ 原子 锁,在 不破坏并发安全 的前提下开放了get_mut。 - v2 —— 能回收:双计数 +
Weak<T>消除了父子互指造成的泄漏,展示了 弱引用 在所有权图中的价值。 - v3 —— 更轻巧:将
Weak独立、用ManuallyDrop替换Option<T>,让没有循环引用需求的场景不再为弱引用买单。
下篇我们将尝试实现一个 Mutex<T>,敬请期待!
Happy Coding! Peace~
附录
1. NonNull<T>
std::ptr::NonNull<T> 是一个
零成本、非空、协变
的裸指针包装器。它本质上只是把一个原始指针塞进
#[repr(transparent)] 的新类型中,但强制保证
绝不为空,因此可以拿到「空值当作枚举判别位」这份额外信息
—— Option<NonNull<T>>
与一个普通指针占用同样大小。
重要特性:
- 永远非空:创建时若传入空指针即触发
未定义行为。编译器可依赖这一点做优化,例如把
Option<NonNull<T>>合并为一个指针宽度。 - 协变:与
*mut T不同,NonNull<T>可以在U: Deref<Target = T>的场景下安全向子类型转换(因为禁止空值带来了额外保证),这使它非常适合构建自定义智能指针。 - 无自动 Drop:
NonNull只存地址,不持有所有权;销毁与释放内存仍由外部逻辑决定(如Box::from_raw、Vec::dealloc等)。
我们将其与裸指针、 Box<T> 智能指针和引用
&T 做一个简单的比较:
| 特性 | *mut T / *const T |
NonNull<T> |
Box<T> / &T |
|---|---|---|---|
| 是否允许为空 | ✅ | ❌ 必须非空 | 不适用 |
| 协变性 | *mut 不协变 |
协变(可安全向子类型转换) | &T 协变 |
| Option 优化 | ❌ 多 1 B 判别字节 | ✅ 与裸指针同尺寸 | 已自带优化 |
| 所有权 / drop 责任 | 没有 | 没有(纯指针) | 有 |
| Send / Sync | 与 T 无关 |
默认 !Send !Sync |
取决于 T |
| 常见用途 | FFI、底层算法 | 智能指针内部、侵入式容器、裁掉空判 | 高层所有权模型 |
关键的 API:
| 分组 | 代表方法 | 说明 |
|---|---|---|
| 构造 | NonNull::new(ptr) →
Option<NonNull<T>>
unsafe { NonNull::new_unchecked(ptr) }
NonNull::dangling() |
安全 / 不安全 / 占位 |
| 引用视图 | unsafe fn as_ref /
as_mut |
把裸指针临时借用成 &T /
&mut T |
| 裸指针互转 | fn as_ptr |
取回 *mut T,解引用仍需 unsafe |
| 类型转换 | fn cast<U> fn cast_mut /
cast_const |
保留地址,换类型或可变性 |
| 地址运算 | unsafe fn add / byte_add / sub / offset |
指针算术,与 ptr::add 族一致 |
| 切片助手 | NonNull::slice_from_raw_parts(data, len)
fn len() |
1.70+ 稳定,为 [T] 提供非空裸切片构造 (rustwiki.org) |
⚠️ 任何把
NonNull重新解释为引用或解引用的操作,都必须在unsafe块里手动保证内存有效性与别名规则。
典型使用场景:
| 场景 | 作用 |
|---|---|
智能指针内部实现
(Box/Rc/Arc) |
需要 非空 原始指针存放被管对象,且要在 Option
等场景下节省空间 |
| 自研侵入式链表 / 红黑树 | 节点自身持有前后指针字段
NonNull<Node<T>>,天然避免空判分支 |
惰性初始化 / Vec::new |
先用 NonNull::dangling()
占位,等真正分配后再写入正确地址 |
| FFI | C API 明确保证参数永不为空时,用 NonNull
在类型层面表达前置条件 |
| 自引用结构(Pin 工作区) | 在完成真正初始化前使用 NonNull::dangling()
保存指向自身字段的指针 |
2. ManuallyDrop<T>
std::mem::ManuallyDrop<T>
是一个零成本(zero-cost)包装器。把值包在 ManuallyDrop
里会告诉编译器:请不要在作用域结束时自动调用它的
Drop
实现,什么时候释放(或是否释放)由我手动决定。
ManuallyDrop<T>只是把 "是否自动 drop" 这一语义从编译器搬到了程序员身上;- 它不改变
T的布局,也不会产生额外的字节,因此是零开销; - unsafe 责任:你必须保证一份值恰好析构一次(不能漏掉,也不能多调)。
它的核心 API 如表所示:
| API | 作用 | 重要注意点 |
|---|---|---|
ManuallyDrop::new(value) |
把 T 包装成
ManuallyDrop<T>,关闭自动析构 |
零开销;之后需显式 drop 或移动走 |
ManuallyDrop::drop(&mut self)
(unsafe) |
手动触发内部值的析构 | 必须保证之后不会再访问 / 再 drop |
ManuallyDrop::take(&mut self)
(unsafe) |
从包装里"搬走"值(等价于 ptr::read) |
v 里留下未初始化内存,不能再用或再 drop |
ManuallyDrop::into_inner(self)
(unsafe) |
消费 ManuallyDrop 返回内部值 |
取得所有权后,原包装已被移走(不会 double-drop) |