系列文章:

继上篇 Rust 实战丨手写一个 oneshot channel,本篇我们继续参考 Rust Atomics and Locks 一书,来实现一个 Arc

Arc 简介

ArcAtomic Reference Counted)是 Rust 标准库里位于 std::sync 模块中的智能指针,用于 在多个线程之间安全地共享只读数据。和只适用于单线程场景的 Rc<T> 不同,Arc<T> 的引用计数增减操作使用原子指令,从而保证跨线程的内存安全。

为什么需要 Arc?

1
2
3
4
5
6
7
let p = Person { age: 18, name: "hedon".to_string(), address: "China".to_string() };

thread::scope(|s| {
s.spawn(|| println!("{:?}", &p)); // ✅ scope 内的线程可以借用
});

thread::spawn(move || println!("{:?}", &p)); // ❌ 需要 'static 生命周期

问题thread::spawn 要求 'static 生命周期,但 &p 只是栈上变量的借用。
解决:使用 Arc::new(p) 把数据移到堆上,通过原子引用计数实现多所有权:

1
2
3
let p = Arc::new(Person { /* ... */ });
let p_clone = p.clone(); // 原子计数 +1
thread::spawn(move || println!("{:?}", p_clone)); // ✅ 编译通过

读完本篇你能学到什么

  • 原子操作的内存顺序选择:通过引用计数这个具体例子,理解 Relaxed / Acquire / Release / fence 的使用场景。
  • 弱引用解决循环引用Weak<T> 的设计原理和在图结构中的应用。
  • Arc::get_mut 的安全性保证:理解「非原子两步校验」的巧妙设计。
  • 零成本抽象的实现细节UnsafeCell + ManuallyDrop 相比 Option<T> 的优势。

v0: 基础引用计数

数据结构

我们先来分析一下基本的数据结构该如何定义,在之前的 Rust 实战丨手写一个 SpinLock,我们讲到了 C++/Rust 常用的并发编程方式 RAII(Resource Acquisition Is Initialization,资源获取即初始化),其核心思想是:在对象构造函数中获取资源,在析构函数中释放资源。上面的案例中,我们在初始化 Person 的时候其实也是应用了这种思路。

1
2
3
4
5
let p = Arc::new(Person {
age: 18,
name: "hedon".to_string(),
address: "China".to_string(),
});

所以我们的自定义 Arc 需要泛型 T,使其可以承载不同的数据类型的数据 data,同时为了做引用计数,它需要一个数值类型 ref_count 来计数,为了并发安全,我们可以选择原子类型 AtomicUsize

Arc 中,我们需要自己管理 data 的生命周期,除了使用裸指针 *mut T*const T 之外,我们可以使用 std::ptr::NonNull<T> ,它是一个零成本、保证非空、支持协变(可安全向子类型转换)的裸指针包装器,具体可参考附录 1. NonNull<T>

综上,我们可以定义以下数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct ArcData<T> {
ref_count: AtomicUsize,
data: T,
}

pub struct Arc<T> {
ptr: NonNull<ArcData<T>>,
}

impl<T> Arc<T> {
pub fn new(data: T) -> Self {
Arc {
ptr: NonNull::from(Box::leak(Box::new(ArcData {
ref_count: AtomicUsize::new(1),
data,
}))),
}
}
}
  1. 我们定义了 ArcDataArc 两个结构,其中 ArcData 包含引用计数 ref_count 和实际数据 data
  2. Arc 中,我们使用 NonNull 来管理 ArcData 的生命周期。初始化时,先用 Box::new() 在堆上分配内存,再通过 Box::leak() 放弃 Box<T> 的所有权,交由 Arc 自行管理。
graph TB
    subgraph Stack ["栈内存 Stack"]
        ArcStruct["Arc<T>
ptr: NonNull<ArcData<T>>"] end subgraph Heap ["堆内存 Heap"] ArcDataStruct["ArcData<T>
ref_count: AtomicUsize(1)
data: T"] end ArcStruct -->|指向| ArcDataStruct subgraph Process ["内存分配过程"] Step1["Box::new(ArcData)"] Step2["Box::leak()"] Step3["NonNull::from()"] Step1 --> Step2 Step2 --> Step3 end classDef stackStyle fill:#e1f5fe,stroke:#01579b,stroke-width:2px classDef heapStyle fill:#f3e5f5,stroke:#4a148c,stroke-width:2px classDef processStyle fill:#fff3e0,stroke:#e65100,stroke-width:2px class ArcStruct stackStyle class ArcDataStruct heapStyle class Step1,Step2,Step3 processStyle

另外,跨线程发送 Arc<T> 会导致 T 对象被共享,即 T 需要满足 Sync trait,而跨线程发送 Arc<T> 也会导致需要由另外一个线程来释放 T,所以需要 T 满足 Send trait。所以只有当 T 满足 Send+Sync 的时候,Arc<T> 才是 Send 的,对于 Sync 也是同理,因为我们可以为 Arc<T> 分别实现 SyncSend trait:

1
2
unsafe impl<T: Send + Sync> Send for Arc<T> {}
unsafe impl<T: Send + Sync> Sync for Arc<T> {}

为了能够便捷的获取 data,我们先为 Arc<T> 实现一个 data() 用于获取 ArcData<T>,同时为其实现 Deref trait,用于像指针一样无感操作 data: T

1
2
3
4
5
6
7
8
9
10
11
12
impl<T> Arc<T> {
fn data(&self) -> &ArcData<T> {
unsafe { self.ptr.as_ref() }
}
}

impl<T> Deref for Arc<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.data().data
}
}

维护引用计数

基础部分我们已经铺垫完了,现在我们需要来实现 2 个最重要的 trait 了:

  • Clone: Arc 引用计数的关键,在每次 clone() 的时候,我们不拷贝 data,而是让 ref_count 自增,进行引用计数。
  • Drop: 在 Arc<T> 实例离开作用域的时候,我们需要让 ref_count 自减,同时在最后一个 Arc<T> 被销毁时,我们需要主动释放 ArcData<T> 的内存资源。

我们先来实现 Clone

1
2
3
4
5
6
7
impl<T> Clone for Arc<T> {
fn clone(&self) -> Self {
// TODO: 处理整型溢出的情况
self.data().ref_count.fetch_add(1, Ordering::Relaxed)
Arc { ptr: self.ptr }
}
}

因为这里没有其他原子操作需要跟当前操作建立严格的 happens-before 关系,所以这里我们可以使用最松的 Relaxed 内存顺序。

接下来看下 Drop

1
2
3
4
5
6
7
8
9
impl <T> Drop for Arc<T> {
fn drop(&mut self) {
if self.data().ref_count.fetch_sub(1, ???) == 1 { // <----- 这里需要什么使用内存顺序约束?
unsafe {
drop(Box::from_raw(self.ptr.as_ptr()));
}
}
}
}

对于 Drop,我们就需要好好想一想需要什么内存顺序约束了。在最后一个 drop 的时候,我们有 2 个目标:

  1. 不能过早释放:确保在引用计数减到 0 并销毁对象之前,没有别的线程仍在使用这份数据;
  2. 要看得见别人写的东西:如果别的线程在它们各自的 drop 里面对共享对象做了写入,最后一个线程做析构时必须"看到"这些写入,否则就可能出现数据竞争或次序错误。

换言之,我们需要最后一个 fetch_sub 跟前面其他每一个 fetch_sub 都建立起 happens before 关系,也即我们需要一对 Release 和 Acquire 来保证 happens-before。

这里简单回顾一下 ReleaseAcquire,不熟悉的读者可以参阅:Rust 原理丨聊一聊 Rust 的 Atomic 和内存顺序

  • Release: 作用于写操作(store),确保该操作之前的所有内存访问不会被重排到这个 Release 操作之后。
  • Acquire: 作用于读操作(load),确保该操作之后的所有内存访问不会被重排到这个 Acquire 操作之前。
  • 当一个线程通过 Acquire 读取到另一个线程通过 Release 写入的值时,会建立一个 happens-before 关系:线程 A 中 Release 写入之前的所有内存写操作,对于线程 B 中 Acquire 读取之后的所有内存读操作都是可见的

Release-Sequence 概念补充:当多个线程对同一个原子变量执行 Release 操作时,这些操作会形成一个 "release-sequence"。后续任何一个 Acquire 操作读取到这个序列中的任意值,都能与整个序列建立 happens-before 关系。这正是为什么我们的 fence(Acquire) 能够与之前所有线程fetch_sub(Release) 形成同步的关键。

我们当然可以使用 ReleaseAcquire 的结合体 AcqRel 来一步到位解决这个问题。不过考虑到只有最后一个 drop 需要满足这个关系,我们可以尝试做得更优雅一些。

在这种仅需在临界值保证 happens-before 的场景下,我们都可以单独在临界情况下使用一个 fence 来建立起 happens-before。

具体来说:

  • 对于非最终 drop:我们只需要使用 Release,即 fetch_sub(1, Ordering::Release),它保证了别的线程如果最终做"最后一次 drop",只要它对相同原子执行一条 Acquire 操作,它就能同步到前面所有线程对数据做过的改动。
  • 对于最终 drop:在调用 fetch_sub 的时候我们仍需要 Release 语义,但是调用时,我们并不知道自己是不是"最后那个线程"。当 fetch_sub 返回 1 的时候,说明我们是"最后那个线程"。这个时候,我们需要建立一个 fetch(Acquire),与之前的所有 Release 形成配对,确保看到之前所有的历史写入,这个时候我们才能确定已经没有别的线程在使用数据了,我们才可以安全地销毁对象。

如下图所示;

sequenceDiagram
    participant A as Thread A
    participant B as Thread B
    participant C as Thread C
(最后 drop) %% 普通写入 A->>A: write shared data … B->>B: write shared data … %% 非最终 drop:Release 写 A->>A: fetch_sub (Release) ⬅ 计数 n→n-1 B->>B: fetch_sub (Release) ⬅ 计数 n-1→n-2 %% 形成 release-sequence Note over A,B: 这两次 Release 写组成
同一条 release-sequence %% 最终 drop:Release 写 & 返回 1 C->>C: fetch_sub (Release) ⬅ 返回 1 → 计数 0 %% Acquire fence 同步 C-->>C: fence (Acquire)
【接收 release-sequence】 %% 安全析构 C->>C: drop(Box::from_raw) %% 结果说明 Note over C: fence (Acquire) 使 A/B
对共享数据的写必定
在析构前可见

经过这么一顿分析后,我们最终的 Drop 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
impl<T> Drop for Arc<T> {
fn drop(&mut self) {
// ❶ 所有 `drop` 都执行:Release
if self.data().ref_count.fetch_sub(1, Ordering::Release) == 1 {
// ❷ 只有最后一个引用才会进来
std::sync::atomic::fence(Ordering::Acquire); // ❸ 补上 Acquire
unsafe {
// ❹ 现在可以安全地回收并析构
drop(Box::from_raw(self.ptr.as_ptr()));
}
}
}
}

实现逻辑:

  1. 所有 drop 都用 Release 语义减引用计数,为可能的"最后一次 drop"做准备
  2. 只有最后一次 drop(返回值为 1)才需要额外的 fence(Acquire) 与之前所有的 Release 建立 happens-before
  3. 安全析构:现在可以确保看到所有历史写入,没人再持有引用

完整代码

自此,我们第一个版本的 Arc 就实现完毕了,我们来看一下最终完成的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
use std::sync::atomic::{AtomicUsize, Ordering, fence};
use std::ptr::NonNull;
use std::ops::Deref;

struct ArcData<T> {
ref_count: AtomicUsize,
data: T,
}

pub struct Arc<T> {
ptr: NonNull<ArcData<T>>,
}

impl<T> Arc<T> {
pub fn new(data: T) -> Self {
Arc {
ptr: NonNull::from(Box::leak(Box::new(ArcData {
ref_count: AtomicUsize::new(1),
data,
}))),
}
}

fn data(&self) -> &ArcData<T> {
unsafe { self.ptr.as_ref() }
}
}

unsafe impl<T: Send + Sync> Send for Arc<T> {}
unsafe impl<T: Send + Sync> Sync for Arc<T> {}

impl<T> Deref for Arc<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.data().data
}
}

impl<T> Clone for Arc<T> {
fn clone(&self) -> Self {
self.data().ref_count.fetch_add(1, Ordering::Relaxed);
Arc { ptr: self.ptr }
}
}

impl<T> Drop for Arc<T> {
fn drop(&mut self) {
if self.data().ref_count.fetch_sub(1, Ordering::Release) == 1 {
fence(Ordering::Acquire);
unsafe {
drop(Box::from_raw(self.ptr.as_ptr()));
}
}
}
}

单元测试

写个单元测试验证一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#[test]
fn arc_should_work() {
static NUM_DROPS: AtomicUsize = AtomicUsize::new(0);

struct DetectDrop;
impl Drop for DetectDrop {
fn drop(&mut self) {
NUM_DROPS.fetch_add(1, Ordering::Relaxed);
}
}

// 创建 2 个 Arc 共享一个元组,包含一个字符串和 DetectDrop 对象
let x = Arc::new(("hedon", DetectDrop));
let y = x.clone();

// 将 x 转移到另外一个线程并使用它
let t = thread::spawn(move || {
assert_eq!(x.0, "hedon"); // 可以正常使用
});

// 这里,y 应该也是可以正常使用的
assert_eq!(y.0, "hedon");

// 等待 t 线程执行完毕
t.join().unwrap();

// 这个时候 `x` 已经被释放了,但是`y` 还没有被释放,
// 所以 `DetectDrop` 应该还没被释放。
assert_eq!(NUM_DROPS.load(Ordering::Relaxed), 0);

// 手动释放掉 `y`,这是最后一个 `Arc`,所以 `DetectDrop` 应该会被释放
drop(y);
assert_eq!(NUM_DROPS.load(Ordering::Relaxed), 1);
}

执行结果是 ok 的:

1
2
3
4
5
6
7
8
9
running 1 test
test arc::tests::arc_should_work ... ok

successes:

successes:
arc::tests::arc_should_work

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s

v1: 实现 get_mut 获取可变引用

按照前面版本的实现,我们是不能为 Arc<T> 实现 DerefMut trait 的,因为我们不能无条件地提供 &mut T 可变引用,因为它可能同时被其他的 Arc<T> 所访问中。

不过,当满足一定条件的时候,我们还是可以提供 &mut T 可变引用的。具体来说,需要满足 2 个条件:

  1. 使用 &mut Arc<T> 保证当前 Arc<T> 只有一个地方在使用;
  2. 需要确保全局只有一个 Arc<T>

为了避免跟 DerefMut 混淆,我们将其声明为一个静态方法,并将 Arc<T> 作为参数,同时因为只有满足特定条件的情况下,才能返回 &mut T,所以我们使用 Option 作为返回值。如下:

1
2
3
4
5
6
7
8
9
10
pub fn get_mut(arc: &mut Arc<T>) -> Option<&mut T> {
if arc.data().ref_count.load(Ordering::Relaxed) == 1 {
fence(Ordering::Acquire);
// Safety: 没有其他地方可以访问数据,
// 因为我们使用了 &mut 独占引用,而且此时只有一个 `Arc`。
unsafe { Some(&mut arc.ptr.as_mut().data) }
} else {
None
}
}
  1. 这里我们在确定只有一个 Arc 实例的时候,使用 fetch(Acquire) 来跟所有其他 Arc 执行 drop()fetch_sub(Release) 建立 happens-before 关系。

  2. 因为此时 ref_count=1,说明仅有当前 Arc 这一个实例,而 get_mut 需要的又是独占引用,所以当前 Arc 不可能再被拿去做 clone() 操作,所以在这个情况下,是可以保证有且仅有一个 Arc 实例,所以我们是可以安全返回 &mut T 的。

  3. 返回的 &mut T 会隐式地借用参数 &mut Arc<T> 的生命周期,而 Rust 不允许可变引用和只读引用交叉存在,所以当前仅剩的这个 Arc,直到 &mut T 作用域结束之前,都是不可用的,所以在那之前,不会再被拿去执行 clone()Deref 等操作,所以是安全的。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    #[test]
    fn get_mut_should_be_safe() {
    let mut x = Arc::new(vec![]);
    let v = Arc::get_mut(&mut x).unwrap();
    v.push(1);

    let y = x.clone(); // <---- cannot borrow `x` as immutable
    v.push(2);
    }

    如上面这个例子,就会报错:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    error[E0502]: cannot borrow `x` as immutable because it is also borrowed as mutable
    --> src/arc.rs:117:17
    |
    114 | let v = Arc::get_mut(&mut x).unwrap();
    | ------ mutable borrow occurs here
    ...
    117 | let y = x.clone();
    | ^ immutable borrow occurs here
    118 | v.push(2);
    | - mutable borrow later used here

v2: 弱指针 Weak<T> 解决循环引用

循环引用问题分析

graph TD
    A --> B;
    A --> C;

引用计数在各种数据结构的表示中非常有用,如上图所示的树形结构,当释放 A 的时候,因为 B 和 C 不再被引用,所以也可以顺带释放了。

但是,如果 B 和 C 也持有对 A 的引用,即形成了循环引用,那按照我们之前的实现,A、B、C 都将永远不会被释放了,因为它们的引用计数永不为 0,哪怕它们三者均不再被使用。如下图所示:

graph TD
    A <--> B;
    A <--> C;
    B -.-> A;
    C -.-> A;

为了应对这种场景,Rust 的标准库中提出的解决方案是:Weak<T>,也称弱指针T 可以在 Arc<T>Weak<T> 之间共享,当所有的 Arc<T> 都失效的时候,T 被释放,无论此时是否有 Weak<T> 的存在。

如下图所示,父节点对子节点使用的是强引用,确保了父节点存活的时候,子节点都存在。而子节点对父节点使用的是弱引用,只即保留了子节点回溯父节点的能力,也不会阻止父节点的释放。

当父节点被释放时,所有强引用计数归零,节点可以依次被释放。

graph TD
    A -- Arc --> B;
    A -- Arc --> C;
    B -.->|Weak| A;
    C -.->|Weak| A;

数据结构

OK,做了这么多铺垫后,我们来思考一下现在的 ArcData 该如何调整。

  1. 之前我们用 ref_count 做引用计数,它代表的都是强引用,现在我们需要记录弱引用数量的相关字段。
  2. 当只有弱引用的时候,data 就已经被释放了,我们需要使用 None 来表示这种情况,所以 data 应该是一个 Option<T> 类型。
  3. ArcData<T> 被一个 Arc 和多个 Weak 共享时,释放最后一个 Arc 时,我们仅拥有 &ArcData<T> 不可变引用,这个时候我们需要将其从 Some(T) 置为 None,即要在不可变引用上实现修改,这就涉及到了前几篇提到的:内部可变性UnsafeCell 是 Rust 提供的一个内部可变性工具类型,它包装一个数据,使得即使在只有不可变引用的情况下也可以进行修改(当然需要在 unsafe 块中操作)。所以我们需要将 data 再用 UnsafeCell 包一层,以满足此场景的需求。

综上,最新的 ArcData 定义如下:

1
2
3
4
5
6
7
8
struct ArcData<T> {
// `Arc` 的数量,也即数据 T 的强引用计数。
data_ref_count: AtomicUsize,
// `Arc` + `Weak` 的数量。
alloc_ref_count: AtomicUsize,
// 数据,当只有 `Weak` 的时候,为 None。
data: UnsafeCell<Option<T>>,
}

然后我们需要定义一个新结构 Weak<T> ,用来表示弱引用,假定这个时候,我们将维护 ArcData 存活的职责交给 Weak<T>,那就可以将 ArcData 转给 Weak<T> 持有,然后在 Arc<T> 中持有一个 Weak<T>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
struct ArcData<T> {
// `Arc` 的数量,也即数据 T 的强引用计数。
data_ref_count: AtomicUsize,
// `Arc` + `Weak` 的数量。
alloc_ref_count: AtomicUsize,
// 数据,当只有 `Weak` 的时候,为 None。
data: UnsafeCell<Option<T>>,
}

pub struct Arc<T> {
weak: Weak<T>,
}

pub struct Weak<T> {
ptr: NonNull<ArcData<T>>,
}

unsafe impl<T: Send + Sync> Send for Weak<T> {}
unsafe impl<T: Send + Sync> Sync for Weak<T> {}

impl<T> Arc<T> {
pub fn new(data: T) -> Self {
Arc {
weak: Weak {
ptr: NonNull::from(Box::leak(Box::new(ArcData {
alloc_ref_count: AtomicUsize::new(1),
data_ref_count: AtomicUsize::new(1),
data: UnsafeCell::new(Some(data)),
}))),
},
}
}
}

impl<T> Weak<T> {
fn data(&self) -> &ArcData<T> {
unsafe { self.ptr.as_ref() }
}
}

impl<T> Deref for Arc<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
let ptr = self.weak.data().data.get();
// Safety: 这个时候还有 Arc 存在,所以 data 肯定是生效的。
unsafe { (*ptr).as_ref().unwrap() }
}
}

在上述代码中,除了数据结构之外,我们做了 3 点调整:

  1. 只需要为 Weak<T> 实现 SyncSend trait,这个时候 Arc<T> 就会被自动实现这 2 个 triat。
  2. data(&self) 辅助函数,移到了 Weak<T> 身上。
  3. Arc<T> 要从 Deref 获取 &T,需要先经过一道 Weak<T>
graph LR
    subgraph Stack ["栈内存 Stack"]
        ArcStruct["Arc<T>
weak: Weak<T>"] WeakStruct["Weak<T>
ptr: NonNull<ArcData<T>>"] end subgraph Heap ["堆内存 Heap"] ArcDataStruct["ArcData<T>
data_ref_count: AtomicUsize(1)
alloc_ref_count: AtomicUsize(1)
data: UnsafeCell<Option<T>>"] end ArcStruct -->|包含| WeakStruct WeakStruct -->|指向| ArcDataStruct classDef stackStyle fill:#e1f5fe,stroke:#01579b,stroke-width:2px classDef heapStyle fill:#f3e5f5,stroke:#4a148c,stroke-width:2px classDef processStyle fill:#fff3e0,stroke:#e65100,stroke-width:2px classDef refStyle fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px class ArcStruct,WeakStruct stackStyle class ArcDataStruct heapStyle class Step1,Step2,Step3,Step4,Step5 processStyle class DataRef,AllocRef,DataContent refStyle

维护引用计数

现在重点来了,我们需要来思考 Weak<T>Arc<T>CloneDrop 该如何实现。其实重点就是该如何管理 alloc_ref_countdata_ref_count 的计数

再次明确下我们的定义:

  • data_ref_count: 代表的是强引用 Arc<T> 的数量。
  • alloc_ref_count: 代表的是 Arc<T> + Weak<T> 的数量。

因此:

  • Clone:
    • Weak<T> 拷贝时,仅增加了弱引用的数量,所以我们只需对 alloc_ref_count 进行自增。
    • Arc<T> 拷贝时,不仅需要拷贝内部的 Weak<T>,还增加了强引用的数量,所以我们还需要对 data_ref_count 进行自增。
  • Drop:
    • Arc<T> 被释放时,不仅释放了其内部的 Weak<T>,还减少了一个强引用,所以需要对 data_ref_count 进行自减。另外,如果是最后一个 Arc<T> 被释放,我们需要将 ArcData<T>.data 置为 None
    • Weak<T> 被释放时,我们仅需减少弱引用的数量,即对 alloc_ref_count 进行自减。另外,当最后一个 Weak<T> 被释放时(此时肯定也没有 Arc<T> 了),我们还需要负责释放 ArcData<T>

综上,我们的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
impl<T> Clone for Weak<T> {
fn clone(&self) -> Self {
if self.data().alloc_ref_count.fetch_add(1, Ordering::Relaxed) > usize::MAX / 2 {
std::process::abort();
};
Self { ptr: self.ptr }
}
}

impl<T> Clone for Arc<T> {
fn clone(&self) -> Self {
let weak = self.weak.clone();
if weak.data().data_ref_count.fetch_add(1, Ordering::Relaxed) > usize::MAX / 2 {
std::process::abort();
}
Arc { weak }
}
}

impl<T> Drop for Weak<T> {
fn drop(&mut self) {
if self.data().alloc_ref_count.fetch_sub(1, Ordering::Release) == 1 {
fence(Ordering::Acquire);
// Safety: 最后一个 Weak<T> 已经被释放了。
unsafe {
drop(Box::from_raw(self.ptr.as_ptr()));
}
}
}
}

impl<T> Drop for Arc<T> {
fn drop(&mut self) {
if self
.weak
.data()
.data_ref_count
.fetch_sub(1, Ordering::Release)
== 1
{
fence(Ordering::Acquire);
let ptr = self.weak.data().data.get();
// Safety: data_ref_count 已经为 0 了,没有地方在使用 data 了。
unsafe { (*ptr) = None }
}
}
}

这里考虑到实际场景中的内存限制,我们对引用计数进行了简单的限制,当其超过 usize::Max/2 的时候,就执行 std::process:abort() 让整个进程崩溃。

get_mut()

接下来我们来考虑下 get_mut(arc: &mut Arc<T>) 该如何修改。什么时候可以返回 &mut T,很明显,当且仅当只有一个 Arc<T> 的时候才可以

get_mut 的实现修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
pub fn get_mut(arc: &mut Arc<T>) -> Option<&mut T> {
if arc.weak.data().alloc_ref_count.load(Ordering::Relaxed) == 1 {
fence(Ordering::Acquire);
// Safety: 这个时候没有其他地方能使用数据,因为只有一个 `Arc`,
// 同时我们拥有仅存的这个 `Arc` 的不可变引用。
let arcdata = unsafe { arc.weak.ptr.as_mut() };
let option = arcdata.data.get_mut();
let data = option.as_mut().unwrap();
Some(data)
} else {
None
}
}

到这里,我们执行之前的 arc_should_work 测试用例,可以发现是顺利通过的。

dowgrade()

回顾下面这张图,为了提供 A 的 Weak<T> 指针,我们需要给 Arc<T> 提供一个 downgrade() 方法,用于将强引用降为弱引用,这是必然可以成功的。同时,当我们也可以为 Weak<T> 提供一个 upgrade() 方法,用于持有弱引用的情况下可以尝试访问数据,当然这未必能成功。

graph TD
    A -- Arc --> B;
    A -- Arc --> C;
    B -.->|Weak| A;
    C -.->|Weak| A;

downgrade() 比较简单,我们直接 clone() 一个 Weak<T> 就可以了:

1
2
3
4
5
impl<T> Arc<T> {
pub fn downgrade(&self) -> Weak<T> {
self.weak.clone()
}
}

upgrade()

upgrade() 就比较复杂了,只有当存在 Arc 的时候,data 才没被释放,这个时候,才能返回升级后的 Arc<T>,即要求 data_ref_count>0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
impl<T> Weak<T> {
pub fn upgrade(&self) -> Option<Arc<T>> {
// 获取 data_ref_count 的值
let mut n = self.data().data_ref_count.load(Ordering::Relaxed);
loop {
// 如果等于 0,则说明 data 已经被释放了,直接返回 None
if n == 0 {
return None;
}
assert!(n < usize::MAX);
// 不为 0 的话,data_ref_count 尝试进行 +1,失败了就重试
if let Err(e) = self.data().data_ref_count.compare_exchange_weak(
n,
n + 1,
Ordering::Relaxed,
Ordering::Relaxed,
) {
n = e;
continue;
}
// 成功了,则 clone weak 即可(执行 alloc_ref_count++)
return Some(Arc { weak: self.clone() });
}
}
}

我们来写一下测试用例,验证使用了 Weak<T> 后,我们的资源能否正确释放。在这之前,我们先写一个非 Weak<T> 版本的,看看资源是否真的没有被释放:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#[test]
fn no_weak_should_not_free_resource() {
static NUM_DROPS: AtomicUsize = AtomicUsize::new(0);
struct Node {
// 使用 RefCell,利用其内部可变性,方便我们建立父子关系
child: RefCell<Vec<Arc<Node>>>,
parent: RefCell<Option<Arc<Node>>>, // <---- 这里持有的是强引用
}

impl Drop for Node {
fn drop(&mut self) {
NUM_DROPS.fetch_add(1, Ordering::Relaxed);
}
}

impl Node {
pub fn new() -> Self {
Self {
child: RefCell::new(vec![]),
parent: RefCell::new(None),
}
}

// 建立父子关系
pub fn add_child(parent: &Arc<Node>, child: Arc<Node>) {
*child.parent.borrow_mut() = Some(parent.clone());
parent.child.borrow_mut().push(child);
}
}

// 限定作用域,离开作用域后,root/child1/child2 正常情况应该被释放。
{
let root = Arc::new(Node::new());
let child1 = Arc::new(Node::new());
let child2 = Arc::new(Node::new());

Node::add_child(&root, child1);
Node::add_child(&root, child2);
}

assert_ne!(NUM_DROPS.load(Ordering::Relaxed), 3); // 不为 3
}

执行上述用例后,我们可以发现 NUM_DROPS 还是 0,即 root/child1/child2 均没有被释放资源。

你可以将 parent 的引用修改为弱引用,然后再重新执行测试用例,就会发现 root/child1/child2 的资源都被正确释放了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct Node {
child: RefCell<Vec<Arc<Node>>>,
parent: RefCell<Option<Weak<Node>>>, // <---- 这里持有的是弱引用
}

impl Node {
pub fn new() -> Self {
Self {
child: RefCell::new(vec![]),
parent: RefCell::new(None),
}
}

// 建立父子关系
pub fn add_child(parent: &Arc<Node>, child: Arc<Node>) {
*child.parent.borrow_mut() = Some(parent.downgrade()); // <---- 使用 downgrade 转为弱引用
parent.child.borrow_mut().push(child);
}
}

v3: 分离强弱引用,避免无用消耗

上个版本我们通过引入了弱引用 Weak<T>,成功解决了循环引用的问题,这是个非常大的进步!

不过我们仍然有进一步优化的空间,可以观察到,Arc<T> 的每次拷贝,都会伴随一次拷贝 Weak<T>,但是,很多时候,我们其实没有循环引用关系的,也即我们并不是每一次都需要 Weak<T>,所以上个版本的实现,其实是有很多的浪费的。

数据结构

所以我们可以考虑将 Weak<T>Arc<T> 中分离出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
pub struct Arc<T> {
ptr: NonNull<ArcData<T>>,
}

unsafe impl<T: Send + Sync> Send for Arc<T> {}
unsafe impl<T: Send + Sync> Sync for Arc<T> {}

pub struct Weak<T> {
ptr: NonNull<ArcData<T>>,
}

unsafe impl<T: Send + Sync> Send for Weak<T> {}
unsafe impl<T: Send + Sync> Sync for Weak<T> {}

impl<T> Arc<T> {
// 调整 Arc 的构造函数
pub fn new(data: T) -> Self {
Arc {
ptr: NonNull::from(Box::leak(Box::new(ArcData {
data_ref_count: AtomicUsize::new(1),
alloc_ref_count: AtomicUsize::new(1),
data: UnsafeCell::new(ManuallyDrop::new(data)),
}))),
}
}

// Arc 已经没有 Weak 了,这个时候,时候给它补一个 data() 辅助函数
fn data(&self) -> &ArcData<T> {
unsafe { self.ptr.as_ref() }
}
}

// 相应地调整 Deref
impl<T> Deref for Arc<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.data().data.get() }
}
}

同时我们需要对 ArcData<T> 结构中的引用技术进行重新定义:

1
2
3
4
5
6
7
8
struct ArcData<T> {
// `Arc` 的数量
data_ref_count: AtomicUsize,
// `Weak` 的数量,当存在 `Arc` 时,额外加 1
alloc_ref_count: AtomicUsize,
// 数据,当只有 `Weak` 的时候,释放它。
data: UnsafeCell<ManuallyDrop<T>>,
}

我们总共做了 3 个重要的调整:

  1. Arc<T> 不再包含 Weak<T>,而是各自独立,不过它们之前会共享底层的 ArcData<T>

  2. alloc_ref_count 的语义,从"Arc + Weak 的数量",变成"Weak 的数量,当存在 Arc 时,额外加 1"。

    换言之,对于所有的 Arc<T>,都共有一个隐式的 Weak<T> ,当释放最后一个 Arc<T> 的时候,这个隐式的 Weak<T> 也需要被释放(本质是执行 alloc_ref_count 减一,同时如果没有其他的 Weak<T>,需要顺带释放 ArcData<T>

  3. data 字段,我们使用 ManuallyDrop 来替代 Option,我们之前使用 None 来表示数据已经被释放,但其实 data_ref_count 已经能表达这层意思了,所以我们这里使用 ManuallyDrop 来进一步节省内存资源。

std::mem::ManuallyDrop<T> 是一个零成本(zero-cost)包装器。它不改变 T 的布局,也不会产生额外的字节,在一些情况下,可以比 Option<T> 少一些标记位和字节对齐所占据的额外空间。具体可参考:附录 2. ManuallyDrop<T>

graph TB
    subgraph Stack ["栈内存 Stack"]
        ArcStruct["Arc<T>
ptr: NonNull<ArcData<T>>"] WeakStruct["Weak<T>
ptr: NonNull<ArcData<T>>"] end subgraph Heap ["堆内存 Heap"] ArcDataStruct["ArcData<T>
data_ref_count
alloc_ref_count
data"] end ArcStruct -->|直接指向| ArcDataStruct WeakStruct -->|直接指向| ArcDataStruct classDef stackStyle fill:#e1f5fe,stroke:#01579b,stroke-width:2px classDef heapStyle fill:#f3e5f5,stroke:#4a148c,stroke-width:2px class ArcStruct,WeakStruct stackStyle class ArcDataStruct heapStyle

维护引用计数

修改了引用计数的语义后,我们需要重新思考如何管理引用计数。

  • Clone:
    • Weak<T> 拷贝时,仅增加了弱引用的数量,所以我们依旧只需对 alloc_ref_count 进行自增。
    • Arc<T> 拷贝时,这个时候,我们就只需要对 data_ref_count 进行自增即可。
  • Drop:
    • Arc<T> 被释放时,我们需要对 data_ref_count 进行自减。另外,如果是最后一个 Arc<T> 被释放,我们需要释放 data ,同时,我们还需要释放那个代表所有 Arc 的隐式 Weak
    • Weak<T> 被释放时,我们仅需减少弱引用的数量,即对 alloc_ref_count 进行自减。另外,当最后一个 Weak<T> 被释放时,我们还需要负责释放 ArcData<T>

具体的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
impl<T> Clone for Weak<T> {
fn clone(&self) -> Self {
if self.data().alloc_ref_count.fetch_add(1, Ordering::Relaxed) > usize::MAX / 2 {
std::process::abort();
};
Self { ptr: self.ptr }
}
}

impl<T> Clone for Arc<T> {
fn clone(&self) -> Self {
if self.data().data_ref_count.fetch_add(1, Ordering::Relaxed) > usize::MAX / 2 {
std::process::abort();
}
Self { ptr: self.ptr }
}
}

impl<T> Drop for Weak<T> {
fn drop(&mut self) {
// 思考下这里为什么要用 Release?后面我们会揭晓!
if self.data().alloc_ref_count.fetch_sub(1, Ordering::Release) == 1 {
fence(Ordering::Acquire);
// Safety: 最后一个 Weak<T> 已经被释放了。
unsafe {
// 释放 ArcData<T>
drop(Box::from_raw(self.ptr.as_ptr()));
}
}
}
}

impl<T> Drop for Arc<T> {
fn drop(&mut self) {
if self.data().data_ref_count.fetch_sub(1, Ordering::Release) == 1 {
fence(Ordering::Acquire);
// 最后一个 `Arc` 被释放,需要做 2 件事情:
// 1. 释放 ArcData.data
// 2. 释放那个代表所有 `Arc` 的隐式 `Weak`
unsafe {
ManuallyDrop::drop(&mut *self.data().data.get());
}
// 在 `Weak` 那边 `drop()` 的时候:
// 1. 会执行 alloc_ref_count--;
// 2. 如果刚好是最后一个 `Weak`,那会顺带销毁 `AraData`。
drop(Weak { ptr: self.ptr });
}
}
}

对于这个隐式 Weak<T>,如果你一时无法很好地 Get 到那个点,可以尝试手动写写整个实现的过程,相信多走几遍流程,你会有那种茅塞顿开的感觉!

flowchart TD
    subgraph Clone["Clone 操作"]
        WeakClone["Weak<T>::clone()"]
        ArcClone["Arc<T>::clone()"]

        WeakClone --> WeakInc["alloc_ref_count.fetch_add(1)"]
        ArcClone --> ArcInc["data_ref_count.fetch_add(1)"]

        WeakInc --> WeakCheck{"计数 > usize::MAX/2?"}
        ArcInc --> ArcCheck{"计数 > usize::MAX/2?"}

        WeakCheck -->|是| Abort1["std::process::abort()"]
        ArcCheck -->|是| Abort2["std::process::abort()"]
        WeakCheck -->|否| WeakNew["返回新的 Weak<T>"]
        ArcCheck -->|否| ArcNew["返回新的 Arc<T>"]
    end

    subgraph Drop["Drop 操作"]
        WeakDrop["Weak<T>::drop()"]
        ArcDrop["Arc<T>::drop()"]

        WeakDrop --> WeakDec["alloc_ref_count.fetch_sub(1, Release)"]
        ArcDrop --> ArcDec["data_ref_count.fetch_sub(1, Release)"]

        WeakDec --> WeakDropCheck{"返回值 == 1?
最后一个 Weak?"} ArcDec --> ArcDropCheck{"返回值 == 1?
最后一个 Arc?"} WeakDropCheck -->|是| WeakFence["fence(Acquire)"] WeakDropCheck -->|否| WeakEnd["结束"] ArcDropCheck -->|是| ArcFence["fence(Acquire)"] ArcDropCheck -->|否| ArcEnd["结束"] WeakFence --> WeakFree["释放 ArcData<T>
Box::from_raw(ptr)"] ArcFence --> ArcDataFree["释放 data
ManuallyDrop::drop()"] ArcDataFree --> ImplicitWeak["释放隐式 Weak
drop(Weak { ptr })"] ImplicitWeak --> WeakDrop end subgraph Memory["内存释放顺序"] Step1["① Arc 释放 data 内容"] Step2["② Arc 释放隐式 Weak"] Step3["③ Weak 释放 ArcData 结构"] Step1 --> Step2 Step2 --> Step3 end classDef cloneStyle fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px classDef dropStyle fill:#ffebee,stroke:#c62828,stroke-width:2px classDef memStyle fill:#fff3e0,stroke:#e65100,stroke-width:2px classDef criticalStyle fill:#fce4ec,stroke:#880e4f,stroke-width:3px class WeakClone,ArcClone,WeakInc,ArcInc,WeakNew,ArcNew cloneStyle class WeakDrop,ArcDrop,WeakDec,ArcDec,WeakEnd,ArcEnd dropStyle class WeakFree,ArcDataFree,ImplicitWeak criticalStyle class Step1,Step2,Step3 memStyle

至此,我们重新运行之前的测试用例 arc_should_work(),可以发现的成功通过的。

下面我们继续来调整 get_mutdowngrade()upgrade()

upgrade()

upgrade() 比较简单,它的规则还是不变,只有当存在 Arc<T> 时,即 data_ref_count>0 时,才能升级成功:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
impl<T> Weak<T> {
//...

pub fn upgrade(&self) -> Option<Arc<T>> {
// 获取 data_ref_count 的值。
let mut n = self.data().data_ref_count.load(Ordering::Relaxed);
loop {
// 如果为 0,表示已经没有 Arc 了,升级失败。
if n == 0 {
return None;
}
assert!(n < usize::MAX);

// 存在 Arc,则对 data_ref_count 尝试进行 CAS 加 1。
if let Err(e) = self.data().data_ref_count.compare_exchange_weak(
n,
n + 1,
Ordering::Relaxed,
Ordering::Relaxed,
) {
// CAS 失败,则重试。
n = e;
continue;
}

// CAS 成功,则返回升级后的 Arc。
return Some(Arc { ptr: self.ptr });
}
}
}

get_mut()

接一下我们看 get_mut,规则也是不变:当前仅当只有一个 Arc<T>,且不存在 Weak<T> 时,才可以返回可变引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 两个条件
// 1. 没有 Weak<T>
// 2. 没有其他的 Arc<T>
pub fn get_mut(arc: &mut Arc<T>) -> Option<&mut T> {
// 将 alloc_ref_count 从 1 置为 usize::Max。
// 如果失败:说明之前不是 1,即存在其他的 Weak<T>,无法获取 &mut T, None
// 如果成功:说明当前没有 Weak<T>,第一步校验通过。
if arc
.data()
.alloc_ref_count
.compare_exchange(1, usize::MAX, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
return None;
}

let is_unique = arc.data().data_ref_count.load(Ordering::Relaxed) == 1;

// 释放 alloc_ref_count
arc.data().alloc_ref_count.store(1, Ordering::Release);

// 如果存在其他的 Arc,则无法获取 &mut T,返回 None
if !is_unique {
return None;
}

// 不存在其他的 Arc,返回 &mut T
fence(Ordering::Acquire);
unsafe { Some(&mut *arc.data().data.get()) }
}

代码的逻辑就不赘述了,但是这里有 3 个原子操作的内存顺序我们需要讨论一下!

分别是:

1
2
3
4
5
// 获取 Weak<T> 的数量并暂时锁定
arc
.data()
.alloc_ref_count
.compare_exchange(1, usize::MAX, Ordering::Acquire, Ordering::Relaxed)
1
2
3
4
// 获取 Arc<T> 的数量
let is_unique = arc.data().data_ref_count.load(Ordering::Relaxed) == 1;
...
fence(Ordering::Acquire);
1
2
// 释放 alloc_ref_count
arc.data().alloc_ref_count.store(1, Ordering::Release);

首先看第 1 个,我们要判断当前是否已经没有 Weak<T> 了,所以我们需要保证看到之前所有 drop(Weak<T>) 的写入操作,所以这里需要建立起一个 happens-before,因此我们之前为 Weak<T> 实现 Drop trait 的时候,使用的是 Release,而这里,需要使用 Acquire 来进行配对,建立 happens-before。


然后看第 2 个,这里我们要判断是否仅有一个 Arc<T>,所以我们需要保证看到之前所有 drop<Arc<T> 的写入操作,因此我们之前为 Arc<T> 实现 Drop trait 的时候,使用的是 Release。但是在这里,我们仅需在仅剩 1 个 Arc 的时候,才有必要建立 happens-before,所以当 is_unique=true 时,我们补一个 fence(Acquire) 屏障,来建立跟 drop(Arc<T>) 的 happens-before。


在分析第 3 个之前,我们需要来尝试挖一下当前 get_mut 的漏洞!相信有部分读者在看到上述实现后,跟笔者一样,会有一个疑惑:上述 2 个条件的检查,并不是原子的,这样的检查还安全可靠吗?

比如说:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
pub fn get_mut(arc: &mut Arc<T>) -> Option<&mut T> {
if arc
.data()
.alloc_ref_count
.compare_exchange(1, usize::MAX, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
return None;
}

// <------- 在这个间隙:另外一个 Arc downgrade() -> Weak,然后 drop(Arc)
// <------- 那它也是检查通过的。

let is_unique = arc.data().data_ref_count.load(Ordering::Relaxed) == 1;
arc.data().alloc_ref_count.store(1, Ordering::Release);
if !is_unique {
return None;
}
fence(Ordering::Acquire);
unsafe { Some(&mut *arc.data().data.get()) }
}

在上面这个例子中:

  1. 假设我们已经通过了第一个检查;
  2. 在进行第二个检查之前的这个间隙中,存在另外一个 Arc<T>
  3. 然后它通过 downgrade() 生成了一个 Weak<T>
  4. 然后再 drop(Arc<T>)
  5. 这个时候我们再检查 Arc<T> 的时候,会发现只有 1 个,检查就通过了。但是其实这个时候,存在了其他的 Weak<T>,所以是有问题的!
sequenceDiagram
    participant A as T1 :get_mut()
    participant B as T2 :downgrade()+drop(Arc)
    participant C as ArcData

    A->>C: CAS alloc_ref 1→MAX ✔
    Note over A,B: 非原子窗口
    B->>C: clone Weak 
alloc_ref++ (Relaxed) B->>C: drop(Arc)
fetch_sub data_ref (Release) A->>C: load data_ref ==1 ✔ A->>C: store alloc_ref MAX→1 (Release) A-->>A: fence(Acquire) → 返回 &mut T(已失效)

所以:我们不能让  downgrade()  在这个间隙中成功执行!这也是我们在前面将 alloc_ref_count 置为 usize::Max(上锁)的原因,在后面的 downgrade() 中,我们肯定要检查这个值,如果 alloc_ref_count=usize::Max,就不能 downgrade() 成功,直到 alloc_ref_store(1, _) 的时候,才能 downgrade() 成功。不过这个时候已经晚了!如果是这样,那 is_unique 肯定就不为 true,所以会返回 None,这个时候,我们不会返回 &mut T,所以,危机就解除了!

downgrade()

我们先来看一下 downgrade() 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
impl<T> Arc<T> {
// ...

pub fn downgrade(&self) -> Weak<T> {
// 获取 Weak 的引用计数
let mut n = self.data().alloc_ref_count.load(Ordering::Relaxed);
loop {
// 如果为 usize::MAX,说明已经被锁住了,这个时候自旋重试!
if n == usize::MAX {
std::hint::spin_loop();
n = self.data().alloc_ref_count.load(Ordering::Relaxed);
}
assert!(n < usize::MAX - 1);

// alloc_ref_count 没被锁住,尝试进行 +1 操作。
if let Err(e) = self.data().alloc_ref_count.compare_exchange_weak(
n,
n + 1,
Ordering::Acquire,
Ordering::Relaxed,
) {
// +1 失败,重试。
n = e;
continue;
}
// +1 成功,表示降级成功,返回 Weak。
return Weak { ptr: self.ptr };
}
}
}

downgrade() 的实现中,我们跟 get_mut 遥相呼应:

  1. 如果 alloc_ref_count=usize::MAX,说明被锁住,这个时候需要自旋等待并重试。

    std::hint::spin_loop()会向 CPU 发送特定指令(如 x86 的 pause 或 ARM 的 yield),提示当前处于忙等待状态,有利于优化 CPU 行为。

  2. 如果 alloc_ref_count 没被锁住,我们尝试进行 CAS,这里成功的时候使用的 Acquire ,为什么呢?这是为了跟前面还未讨论的第 3 个原子操作建立 happens-before!

现在我们终于可以来解开这第 3 个原子操作的原子顺序谜团了:

1
2
// 释放 alloc_ref_count
arc.data().alloc_ref_count.store(1, Ordering::Release);

这里我们必须保证跟 download()compare_exchange(_,_,Acquire,_) 建立起 happens-before 关系,即将 alloc_ref_count 置为 1 的结果必须被 downgrade() 所在的线程看到。不然的话,这里 compare_exchange 成功了,将 alloc_ref_count 置为 2 了,但是 get_mut 又将其置为 1 了,就乱套了!

完整代码

到这里我们终于是完成了 v3 版本的优化工作了,真棒!介于篇幅已经够长了,这里就不再贴出完整的代码了,感兴趣的读者可以参阅:hedon-rust-road/conutils/arc

浅探标准库的 Arc<T>

在最后,我们来看一下标准库的 Arc<T> 是如何实现的,看文章开头说的实现一个可以媲美标准库的 Arc<T> 是不是在吹牛!

标准库(rustc 1.87.0)的 Arc<T> 位于 sync.rs 文件中,我们来看下它的数据结构定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
pub struct Arc<
T: ?Sized,
#[unstable(feature = "allocator_api", issue = "32838")] A: Allocator = Global,
> {
ptr: NonNull<ArcInner<T>>,
phantom: PhantomData<ArcInner<T>>,
alloc: A,
}

pub struct Weak<
T: ?Sized,
#[unstable(feature = "allocator_api", issue = "32838")] A: Allocator = Global,
> {
ptr: NonNull<ArcInner<T>>,
alloc: A,
}

其他一些莫名其妙的标记咱就不管了,映入眼帘可以看到一个 ArcInner,这不就是咱的 ArcData 吗!点进去看下:

1
2
3
4
5
6
7
8
9
10
struct ArcInner<T: ?Sized> {
strong: atomic::AtomicUsize,

// the value usize::MAX acts as a sentinel for temporarily "locking" the
// ability to upgrade weak pointers or downgrade strong ones; this is used
// to avoid races in `make_mut` and `get_mut`.
weak: atomic::AtomicUsize,

data: T,
}

好家伙!这不就是咱的 ArcData !

  • strong: 对应我们的 data_ref_count
  • weak: 对应我们的 alloc_ref_count

weak 字段上面的注释也揭示了其核心逻辑跟咱是高度一致的!

我们简单看下最重要的 DropClone 的实现,因为这涉及到引用计数的维护:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
const MAX_REFCOUNT: usize = (isize::MAX) as usize;

// Arc: Clone
impl<T: ?Sized, A: Allocator + Clone> Clone for Arc<T, A> {
fn clone(&self) -> Arc<T, A> {
let old_size = self.inner().strong.fetch_add(1, Relaxed); // fetch_add
if old_size > MAX_REFCOUNT { // 溢出保护
abort();
}
// 返回
unsafe { Self::from_inner_in(self.ptr, self.alloc.clone()) }
}
}

// Arc: Drop
unsafe impl<#[may_dangle] T: ?Sized, A: Allocator> Drop for Arc<T, A> {
#[inline]
fn drop(&mut self) {
// fetch_sub release 强引用
if self.inner().strong.fetch_sub(1, Release) != 1 {
return;
}
// 最后一个 Arc 释放的时候,补一个 fence(Acquire),
// 与前面所有的 release 建立 happens-before。
acquire!(self.inner().strong);

// 最后一个 Arc 释放的时候,释放 ArcInner 里面的 data
unsafe {
self.drop_slow();
}
}
}

// Weak: Clone
impl<T: ?Sized, A: Allocator + Clone> Clone for Weak<T, A> {
#[inline]
fn clone(&self) -> Weak<T, A> {
if let Some(inner) = self.inner() {
let old_size = inner.weak.fetch_add(1, Relaxed); // fetch_add 弱引用数量
if old_size > MAX_REFCOUNT { // 溢出保护
abort();
}
}

// 返回 Weak
Weak { ptr: self.ptr, alloc: self.alloc.clone() }
}
}


// Weak: Drop
unsafe impl<#[may_dangle] T: ?Sized, A: Allocator> Drop for Weak<T, A> {
fn drop(&mut self) {
let inner = if let Some(inner) = self.inner() { inner } else { return };

if inner.weak.fetch_sub(1, Release) == 1 { // fetch_sub release 弱引用
acquire!(inner.weak); // 最后一个补一个 fence(acquire)
unsafe { // 释放 ArcInner
self.alloc.deallocate(self.ptr.cast(), Layout::for_value_raw(self.ptr.as_ptr()))
}
}
}
}

可以看到跟咱前面的实现是一样一样的!为啥?因为 Rust Atomics and Locks 一书的作者Mara Bos 就是 Rust 标准库团队的领导之一!牛逼!

总结

通过三轮迭代,我们不仅实现了一个媲美了标准库的 Arc<T> ,还更进一步体验了 Rust 并发抽象的设计哲学

  • v0 —— 能用:单计数 ArcData<T> 解决了跨线程多所有权,但仍缺乏可变视图与断环能力。
  • v1 —— 能改:借助独占 &mut Arc<T> + 原子 ,在 不破坏并发安全 的前提下开放了 get_mut
  • v2 —— 能回收:双计数 + Weak<T> 消除了父子互指造成的泄漏,展示了 弱引用 在所有权图中的价值。
  • v3 —— 更轻巧:将 Weak 独立、用 ManuallyDrop 替换 Option<T>,让没有循环引用需求的场景不再为弱引用买单。

下篇我们将尝试实现一个 Mutex<T>,敬请期待!

Happy Coding! Peace~

附录

1. NonNull<T>

std::ptr::NonNull<T> 是一个 零成本、非空、协变 的裸指针包装器。它本质上只是把一个原始指针塞进 #[repr(transparent)] 的新类型中,但强制保证 绝不为空,因此可以拿到「空值当作枚举判别位」这份额外信息 —— Option<NonNull<T>> 与一个普通指针占用同样大小。

重要特性:

  • 永远非空:创建时若传入空指针即触发 未定义行为。编译器可依赖这一点做优化,例如把 Option<NonNull<T>> 合并为一个指针宽度。
  • 协变:与 *mut T 不同,NonNull<T> 可以在 U: Deref<Target = T> 的场景下安全向子类型转换(因为禁止空值带来了额外保证),这使它非常适合构建自定义智能指针。
  • 无自动 DropNonNull 只存地址,不持有所有权;销毁与释放内存仍由外部逻辑决定(如 Box::from_rawVec::dealloc 等)。

我们将其与裸指针、 Box<T> 智能指针和引用 &T 做一个简单的比较:

特性 *mut T / *const T NonNull<T> Box<T> / &T
是否允许为空 ❌ 必须非空 不适用
协变性 *mut 不协变 协变(可安全向子类型转换) &T 协变
Option 优化 ❌ 多 1 B 判别字节 ✅ 与裸指针同尺寸 已自带优化
所有权 / drop 责任 没有 没有(纯指针)
Send / Sync T 无关 默认 !Send !Sync 取决于 T
常见用途 FFI、底层算法 智能指针内部、侵入式容器、裁掉空判 高层所有权模型

关键的 API:

分组 代表方法 说明
构造 NonNull::new(ptr)Option<NonNull<T>>
unsafe { NonNull::new_unchecked(ptr) }
NonNull::dangling()
安全 / 不安全 / 占位
引用视图 unsafe
fn as_ref / as_mut
把裸指针临时借用成 &T / &mut T
裸指针互转 fn as_ptr 取回 *mut T,解引用仍需 unsafe
类型转换 fn cast<U>
fn cast_mut / cast_const
保留地址,换类型或可变性
地址运算 unsafe fn add / byte_add / sub / offset 指针算术,与 ptr::add 族一致
切片助手 NonNull::slice_from_raw_parts(data, len)
fn len()
1.70+ 稳定,为 [T] 提供非空裸切片构造 (rustwiki.org)

⚠️ 任何把 NonNull 重新解释为引用或解引用的操作,都必须在 unsafe 块里手动保证内存有效性与别名规则。

典型使用场景:

场景 作用
智能指针内部实现 (Box/Rc/Arc) 需要 非空 原始指针存放被管对象,且要在 Option 等场景下节省空间
自研侵入式链表 / 红黑树 节点自身持有前后指针字段 NonNull<Node<T>>,天然避免空判分支
惰性初始化 / Vec::new 先用 NonNull::dangling() 占位,等真正分配后再写入正确地址
FFI C API 明确保证参数永不为空时,用 NonNull 在类型层面表达前置条件
自引用结构(Pin 工作区) 在完成真正初始化前使用 NonNull::dangling() 保存指向自身字段的指针

2. ManuallyDrop<T>

std::mem::ManuallyDrop<T> 是一个零成本(zero-cost)包装器。把值包在 ManuallyDrop 里会告诉编译器:请不要在作用域结束时自动调用它的 Drop 实现,什么时候释放(或是否释放)由我手动决定。

  • ManuallyDrop<T> 只是把 "是否自动 drop" 这一语义从编译器搬到了程序员身上;
  • 不改变 T 的布局,也不会产生额外的字节,因此是零开销;
  • unsafe 责任:你必须保证一份值恰好析构一次(不能漏掉,也不能多调)。

它的核心 API 如表所示:

API 作用 重要注意点
ManuallyDrop::new(value) T 包装成 ManuallyDrop<T>,关闭自动析构 零开销;之后需显式 drop 或移动走
ManuallyDrop::drop(&mut self) (unsafe) 手动触发内部值的析构 必须保证之后不会再访问 / 再 drop
ManuallyDrop::take(&mut self) (unsafe) 从包装里"搬走"值(等价于 ptr::read v 里留下未初始化内存,不能再用或再 drop
ManuallyDrop::into_inner(self) (unsafe) 消费 ManuallyDrop 返回内部值 取得所有权后,原包装已被移走(不会 double-drop)