系列文章:
继上篇 Rust
实战丨手写一个 SpinLock,本篇我们继续参考 Rust Atomics and Locks
一书,来实现一个 oneshot channel
。
在 Go 语言中,有一句名言:
Don't communicate by sharing memory, share memory by communicating. 不要通过共享内存来通信,而要通过通信来共享内存。
讲的就是通道 channel
。使用 channel
来通信,一方面可以避免共享状态的并发竞争问题,另一方面可以解耦生产者和消费者。
channel
根据生产者和消费者的数量,可以分为以下几种:
- 单生产者单消费者 (SPSC)
- 单生产者多消费者 (SPMC)
- 多生产者单消费者 (MPSC)
- 多生产者多消费者 (MPMC)
在单生产者单消费者这个分类中,有一种特殊且常用的场景,叫一次性通道(oneshot channel)。它在 SPSC 的基础上增加了额外约束:整个生命周期内只传递一次数据,传递完成后通道就失效了。
熟悉 Go 语言的读者应该对以下使用场景很熟悉,这些都是典型的 oneshot channel 应用:
1 | func demo1() { |
1 | func demo2() { |
在 Rust 社区里面,就有一个非常优秀的 oneshot
实现,在详细深入它的实现之前,我们先参考 Rust Atomics
and Locks 一书,来尝试实现一个 oneshot channel
!
读完本篇你能学到什么
一次性 (oneshot) 通道的场景与优势
- 了解它与多生产者/多消费者通道的区别
- 掌握常见使用模式(如线程同步、单次结果返回)
Rust 并发核心原语的渐进式实践
UnsafeCell
:内部可变性基石AtomicBool
+Ordering::{Release,Acquire}
:最小化同步原语MaybeUninit<T>
:零成本延迟初始化
用所有权与生命周期设计零误用 API
- 将
Sender
/Receiver
拆分并一次性消费 - 生命周期引用 vs
Arc
的权衡与替换技巧
- 将
线程挂起/唤醒机制
std::thread::park / unpark
的阻塞式等待模型
类型系统层面的“防呆”手段
- 利用
PhantomData
禁止跨线程误用 Drop
手动回收,避免内存泄漏
- 利用
一步步优化的思考路径
- 如何发现问题 → 提出假设 → 实现 → 验证 → 再迭代
带着这些目标,跟随本文一路迭代到 v8,你将拥有一个高性能、零误用的 oneshot channel,以及一整套可迁移到其它并发场景的设计思维。
热身版 v0:基于锁的通道
我们先来实现一个万能版 channel
热热身。顾名思义,channel
分为 2 个功能,send
和 receive
,其中:
send
往channel
一头放数据。receive
从channel
另外一头取数据,如果没有数据,则阻塞住,直到有数据时返回取出数据并返回。
在 Rust 中,我们可以用队列 VecQueue
来作为数据的承载,同时为了对队列访问的并发安全,我们需要使用锁
Mutex
来保护它,另外,在消费者取数据时,如果没有数据,则需要阻塞并等待唤醒(使用循环等待就太耗
CPU 了),所以我们可以使用条件变量 Condvar
来实现挂起和唤醒。
经过以上分析,我们可以定义如下的结构:
1 | use std::{ |
send
和 receive
方法也比较简单,如下所示:
1 | impl<T> Channel<T> { |
这里需要注意的是,self.queue.lock().unwrap()
返回的
b
是一个 MutextGuard
,所以当执行
self.item_ready.wait(b)
的时候,在挂起当前线程的时候,会释放
b
,所以这里不会一直占用锁,而导致其他线程抢不到锁。
这个版本的实现在功能上当然没有问题,但是在性能上还有非常多可以优化的地方,尤其是在锁的使用上,在高并发的情况下,锁的竞争会非常激烈。
OK,热完身后,我们开始基于这个实现,来一步步实现一个高性能的
oneshot channel
!
基础版 v1:unsafe 提醒使用者
1 | pub struct Channel<T> { |
我们先来分析一下,一个 oneshot channel
的结构,需要包含哪些字段。
- 首先它可能会有 0 条数据或 1 条数据,所以很当然,数据可以用一个
Option
来承载。 - 另外,
send
和receive
可以在不同的线程中被调用,所以我们只能用共享只读引用,而不是用mut
独享可变引用,但是send
和receive
都需要对数据进行修改,所以我们这里就需要一个支持内部可变性的数据结构,这个时候,就用到了上篇 Rust 实战丨手写一个 SpinLock 介绍的UnsafeCell<T>
,它允许在共享引用下进行内部可变性修改,是 Rust 并发原语的基石,这里不再赘述。 - 最后,我们需要一个变量来表明是否有数据,为了并发安全,这里可以用
AtomicBool
,为此,我们也增加了一个is_ready
的方法,用于判断数据是否已准备好。对于原子变量,我们使用一对Release
和Acquire
(Release
确保之前的写入对其他线程可见,Acquire
确保能看到之前的Release
写入)来确保原子变量的跨线程可见性。
基于以上分析,我们定出了新的 Channel
结构:
1 | pub struct Channel<T> { |
对应的 send
、receive
和
is_ready
实现如下:
1 | impl<T> Channel<T> { |
在这个版本中:
- 我们暂且使用
unsafe
加注释的方式,来 提醒 使用者,send
和receive
只能被调用一次,同时,在调用receive
之前,必须先使用is_ready
进行数据检查。 - 对于原子变量,我们使用一对 Release 和 Acquire 来确保原子变量的跨线程可见性,具体可参考 Rust 原理丨聊一聊 Rust 的 Atomic 和内存顺序。
另外别忘了,UnsafeCell<T>
是不支持
Sync
的,所以为了我们的 Channel
可以跨线程使用,我们需要为其实现 Sync
trait:
1 | // 1. Channel<T> 可以在不同的线程中被分别执行 send 和 receive,所以它的引用可以在线程中共享,所以需要实现 Sync; |
使用方法如下:
1 |
|
基础版 v2:使用 MaybeUninit 替代 Option 减少内存开销
我们先来思考一个问题:Option<T> 的内存占用是多少?
结论是::
Option<T>
相比于 T,可能需要额外消耗标记位和填充位的空间。具体可参考附录:1. Option<T> 的内存占用是多少。
另外一点是,Option<T>
其实已经包含了是否存在值的信息了,它跟 ready
这个标志的作用其实重复了,有一些浪费。
在当下场景,我们可以使用另外一个数据结构来替代
Option<T>
——
MaybeUninit<T>
,相比于
Option<T>
,它有以下优势:
- 内存占用优化:在
Option<T>
中,对于非空指针优化(Niche Optimization)的类型,None
会占用额外的空间(一个字节的标签+可能的对齐填充)。而MaybeUninit<T>
本身就是一个大小与T
相同的未初始化内存,它没有标签,因此不会引入额外的内存开销。 - 避免初始化开销:使用
Option<T>
时,在初始化时设置为None
,实际上会写入一个表示None
的值(即进行初始化)。而MaybeUninit<T>
的uninit()
不会对内存进行任何初始化,这在性能敏感的场景下可以避免不必要的初始化开销(特别是当T
很大时)。 - 更灵活地控制初始化:在通道的实现中,消息可能由生产者写入,然后通过设置
ready
标志来通知消费者。使用MaybeUninit
允许我们延迟初始化,直到实际需要写入消息的时候。这样,在通道创建时,我们不需要为T
类型的值进行任何初始化(即使是None
),而是留出一块未初始化的内存,在后续由生产者写入实际的值。 - 与原子标志配合更高效:在上个版本的视线中,
ready
是一个AtomicBool
,用于指示消息是否就绪。在Option<T>
版本中,我们需要检查Option
是否为Some
,同时还要检查ready
标志。而使用MaybeUninit
后,我们完全依赖ready
标志来判断消息是否可用,避免了双重检查(因为MaybeUninit
本身不携带状态,所以状态完全由ready
控制)。这样,结构体的内存布局更紧凑,且访问模式更直接。 - 潜在的性能提升:由于避免了额外的标签和初始化,以及更紧凑的内存布局,可能会提高缓存利用率,从而提升性能。
MaybeUninit<T>
有以下常用方法:
常用方法 | 作用 | 安全级别 |
---|---|---|
MaybeUninit::uninit() |
创建一块完全未初始化的内存 | const fn 、safe |
as_mut_ptr() / as_ptr() |
取出裸指针,供外部写入或读取 | safe |
assume_init() / assume_init_read() |
告诉编译器“这里已经是一个合法的 T 了”,并返回它 |
unsafe (因为你得保证真初始化过) |
write(val) |
按位把 val 复制/移动
到这块未初始化内存;此后视为已初始化 |
unsafe |
经过上面一顿分析,我们来使用 MaybeUninit<T>
来替代
Option<T>
,进一步减少内存占用和提升性能,新的
Channel
结构如下:
1 | pub struct Channel<T> { |
对应的 send
和 receive
实现更新如下:
1 | impl<T> Channel<T> { |
其中 send
中,(*self.message.get())
从
UnsafeCell<MaybeUninit<T>>
中取出
MaybeUninit<T>
,然后调用 write
方法把
message
写入这块未初始化的内存中,此后视为已初始化,并可以使用使用
assume_init_read
进行读取。
修改后,测试代码没有发生变化,我们执行之前的测试代码,发现还是可以通过的!
基础版 v3:增加动态检查提高安全性
上述版本中,我们通过 unsafe
和注释去“要求”调用者严格遵循以下约束:
send
和receive
最多只调用一次。receive
调用之前,必须先经过is_ready
的检查。
在这个版本中,我们加一下动态检查,如果调用者不按要求做事,那就直接
panic
给出告警。
在 receive
中,我们需要做 2 点保证:① 已经有数据了,②
数据只被消耗了一次。
1 | pub unsafe fn receive(&self) -> T { |
这里我们使用 swap
,将 ready
从
false
转为 true
,达到了 2 个目的:
- 如果返回了
false
,则说明之前的ready
为false
,即数据没准备好。 - 如果返回了
true
,则说明数据已经准备好了,这个时候,也已经将ready
置为false
,这样后面调用的receive
也将失败。
对于
send
,我们需要保证只写入一次,所以这里我们需要引入一个新的变量
in_user
,表示 send
是否已经使用了:
1 | pub struct Channel<T> { |
在 send
中,我们依旧使用 swap
,来将
in_use
从转为 true
,如果返回
true
,则说明之前已经执行过 send
了,这个时候将执行 panic 进行告警。
1 | /// Panics when trying to send more than one message. |
通过上述的 2 个优化,我们的 Channel
又“安全”了一丢丢!
基础版 v4:实现 Drop 自动清理无用内存
因为我们使用了
MaybeUninit<T>
,所以我们需要自己管理 T
的内存管理,但在上述的实现中,可能存在一种情况,导致内存得不到释放:我们只执行了
send
,但直到 Channel
超过作用域的时候,都没有被 receive
。
为此,我们可以为 Channel
实现 Drop
trait,当有数据的时候,对MaybeUninit<T>
进行内存释放:
1 | impl<T> Drop for Channel<T> { |
安全非阻塞版 v5:提供安全方法,减少使用者误用
在这个版本中,我们来解决前面实现的最大问题:方法是不安全的,严重依赖调用者的自觉性,没有充分发挥 Rust 强大编译器的检查能力。
回顾我们的需求:我们要实现的是一个
oneshot channel
,即只能调用一次 send
和
receive
。
第一个问题是:如何利用 Rust 天然的编译器检查能力来约束这一点呢?很明显,就是所有权机制!什么东西只能执行一次呢?消耗所有权的东西!
1 | // 对于第一个参数为 self 的方法,执行时,会转移所有权,执行后,原变量就不能再用了,因为所有权已经转移了。 |
好,那第二个问题就来了:self
方法只能调用一次,但很明显我们总共需要 2 次的调用(send
和
receive
),所以这里我们可以将 Channel
进行拆开,分成 Sender
和 Receiver
。
那第三个问题也就随之而来了,Sender
和
Receiver
都需要持有
Channel
,并且可能处于不同的线程,这里我们可以先用
Arc
来对 Channel
进行引用。
解决了上述 3 个问题,我们可以梳理新的数据结构:
1 | pub struct Sender<T> { |
Channel
中移除了in_use
属性,因为我们已经有self
做所有权检查了,不再需要in_use
来避免重复调用send
了。- 新增了
Sender<T>
和Receiver<T>
两个结构,它们都各自持有了一个Arc<Channel>
。
对应的 send
和 receive
方法当然也就转移到
Sender<T>
和 Receiver<T>
身上了,实现也和之前基本一致:
1 | impl<T> Sender<T> { |
修改了结构了,我们需要修改对应的测试代码:
1 |
|
在最新的测试代码中,我们已经不再需要 unsafe
代码了!这对于使用者来说,就非常友好了!
而且这个时候,你如果尝试执行多次 send
和
receive
的时候,编译器就会报错了!
不过它还是有 2 个缺点:
Arc
的复制还是有一些开销的。- 我们依旧依赖使用者提前用
is_ready
来检查,否则直接调用receive
就有可能会panic
。
我们先来解决第 1 个问题。
安全非阻塞版 v6:使用生命周期加引用,避免 Arc 的复制开销
为了避免 Arc 的开销,我们需要在 Sender<T>
和
Receiver<T>
中持有 Channel<T>
的引用,而引用的对象的生命周期是不确定的,所以我们需要加入生命周期标注,来告诉编译器我们的引用是逻辑自洽的。
新的结构和构造函数如下:
1 | pub struct Sender<'a, T> { |
在这个版本的实现中,我们新增了 split
方法:
- 其中参数
&'a mut self
表明它是一个独占引用,即channel.split()
不会有并发问题。 - 第一行代码
*self = Self::new()
我们对原有的Channel
进行重置,保证拆分之前通道里绝对没有残留数据,避免旧消息被下一对Sender/Receiver
误使用。 'a
直接来自于&'a mut self
,保证两端把手绝不会比原始Channel
活得更久。
新的测试代码如下:
1 |
|
安全阻塞版 v7:去掉 is_ready 完全避免使用者误调用
截止目前的实现版本中,我们还是依赖使用者在执行 receive
之前先执行 is_ready
进行数据检查,还是存在一定的误操作性。现在我们来实现一个完全阻塞的版本,来完全避免这个情况。
我们需要做几件事情:
- 去掉
is_ready
方法; - 在
receive
中,根据ready
判断是否存在数据:- 如果存在,则直接取出数据并返回;
- 如果不存在,则需要先挂起当前线程,等待唤醒(直接 CPU 循环检查肯定可以,但不够优雅!咱不干!);
- 在
send
中,放入数据后,尝试唤醒可能处于挂起中的线程。
那现在最重要的一个问题是:如何唤醒处于挂起中的线程?更进一步,唤醒哪个线程?
这里其实是说不定的,因为 Sender
和 Receiver
都可能被放入任何一个线程中,不过在 Rust Atomics
and Locks 书中,作者假定了 Receiver
会固定在调用
split
的那个线程。
笔者认为这个假设是简单且有效的,回顾一下我们前面举的 Go 语言的 2 个例子:
1 | func demo1() { |
1 | func demo2() { |
在这两个最常见的 oneshot channel
的例子中,Receiver
就是处于调用 split
的线程中。所以我们可以基于这个假设来实现这个版本。
先回顾下 Thread
2 个最核心的方法:
Thread.park(thread)
: 挂起线程,等待唤醒。thread.unpark()
: 唤醒线程。
首先我们需要在 Sender
中保存待唤醒的线程:
1 | pub struct Sender<'a, T> { |
在 split()
的时候,我们需要获取当前线程并保存在
Sender
中:
1 | pub fn split<'a>(&'a mut self) -> (Sender<'a, T>, Receiver<'a, T>) { |
在 recevie
的时候,如果没有数据,我们就可以挂起当前线程,等待唤醒:
1 | impl<T> Receiver<'_, T> { |
在 send
完数据后,唤醒可能挂起的线程:
1 | impl<T> Sender<'_, T> { |
最后删除之前的 is_ready
方法,然后更新我们的测试代码:
1 |
|
最终版 v8:使用 PhantomData 来保证 Receiver 处于 split() 线程
是不是觉得,上述实现已经完美无瑕了!其实不然,我们虽然假设了
Receiver
处于调用 split()
的线程中,但是还是无法阻止使用者将 Receiver
转移到其他线程。
再次回顾下我们现在的 Channel
和
Receiver
:
1 | pub struct Receiver<'a, T> { |
我们为 Channel<T>
实现了 Sync
trait,而标准库中有这 2 行代码:
1 | impl<T: ?Sized + Sync> Sync for &T {} |
所以 &Channel<T>
实现了 Sync/Send
trait,而 Receiver
只持有了一个
&'a Channel<T>
,所以它也是 Send
的!
所以 Receiver
是可以被转移到其他线程的,即下述的测试代码在编译上也是通过的:
1 |
|
但是我们执行后会发现,receiver.receive()
会被永久阻塞住,这是因为 sender.send(1)
只会唤醒执行
split()
的线程。
为了避免这种情况的发生,我们需要强行防止 Receiver
实现
Send
trait!
怎么办呢?我们需要做到 2 件事情:
- 让
Receiver
持有一个非Sync
的属性; - 这个属性除了标记没有其他作用,最好不要占用任何的资源。
这里我们介绍一位新朋友:PhantomData
:
PhantomData
是一个零大小类型(Zero-Sized Type, ZST),用于在编译期向类型系统传递额外信息,而不占用运行时内存。我们可以用它在包一个
!Send
的类型,这样Receiver
就是!Send
的了。关于PhantomData
的更多介绍,可以参考附录 2:PhantomData。
在介绍完 PhantomData
后,我们就可以使用它来防止
Receiver
实现 Send
trait 了:
1 | pub struct Receiver<'a, T> { |
其中 *const()
是 !Send
的,所以我们的
Receiver
再也不会被转移到其他线程了,而 send
是要求 self
,所以即便是 Sync
的也无所谓了,因为无法通过引用来执行 receive()
方法。
现在我们可以再次执行上面的测试代码(强行将 Receiver 移动到其他线程中),将会得到以下的报错:
1 | error[E0277]: `*const ()` cannot be sent between threads safely |
到这里,通过 8
个版本,我们一步步实现了一个高性能、低内存占用且安全可用的
oneshot channel
了!
总结
至此,通过 8 个小版本,我们不仅手写了一个 高性能、安全友好的 oneshot channel,还更进一步体验了 Rust 在并发领域 “以类型系统驱动正确性” 的威力。我们来做一个简单的小结。
关键收获:
版本 | 新增能力 | 解决了什么问题 |
---|---|---|
v0 | Mutex + Condvar 通用通道 |
打开话题、对比后续无锁方案 |
v1 | UnsafeCell<Option<T>> +
AtomicBool |
去锁化、最小可行一次性通道 |
v2 | 替换为 MaybeUninit<T> |
节省内存&避免双状态检查 |
v3 | 运行时检查 (swap ) |
阻止未准备/二次调用导致 UB |
v4 | Drop 清理 |
防止“只 send 不 recv”泄漏 |
v5 | Sender / Receiver 所有权 API |
编译期保证“仅调用一次” |
v6 | 生命周期引用替换 Arc |
消除引用计数开销 |
v7 | park / unpark 阻塞模型 |
使用者不再需要轮询 is_ready |
v8 | PhantomData 防跨线程误用 |
类型系统彻底封死错误用法 |
核心记忆点:
- 内部可变性:
UnsafeCell
是所有并发原语的基石。 - 延迟初始化:
MaybeUninit<T>
+ “就绪标志” 是零成本组合。 - Release / Acquire:最轻量的跨线程可见性保障。
- 所有权设计 API:让编译器替你兜底逻辑约束。
- PhantomData:零大小但能影响
Send/Sync
的类型级标记。 - 迭代思路:先跑通,再收口安全性与性能,最后用类型系统“防呆”。
完整的代码可以参考:conutils-oneshot。
oneshot crate 浅探
附录
1. Option<T> 的内存占用是多少?
在 Rust 中,Option<T>
类型占用的内存,取决于泛型参数 T
的类型特性。具体可分为两种情况:
- 当 T
是非指针类型(如基本类型、结构体等)时,
Option<T>
需要额外的空间存储 Some 或 None 的标签,此时:- 内存布局:包含一个 1 字节的标签(标识 Some 或 None)和
T
类型的数据空间(可能包含对齐填充)。 - 即使为
None
,仍需保留T
所需的内存空间(含填充),以保障枚举值大小统一。如Option<i32>
占用 8 字节(1 字节标签 + 4 字节 i32 + 3 字节填充)。
- 内存布局:包含一个 1 字节的标签(标识 Some 或 None)和
- 当 T 是不可为空的指针类型(如
Box<T>、&T、&mut T)时,Rust
编译器会启用空指针优化(Niche
Optimization)。即利用指针不能为 0 的特性,将
None
标识为全零位模式(0x00),而Some(ptr)
存储实际指针地址,此时无需额外标签。这个时候,None
和Some(T)
不占用任何的额外空间,大小与 T 相同。
我们可以写个程序来简单验证一下:
1 | fn test_option() { |
在笔者的电脑下,输出如下:
1 | 基本类型: |
通过输出我们可以观察到经过空指针优化,Option<Box<i32>>
的 None
和 Some
都只占 8
字节(与 Box<i32>
相同),同时通过为
None
复用类型的无效位模式(如
0x0
)消除枚举标签,实现零成本抽象。
2. PhantomData
Rust 中的 PhantomData
是一个零大小类型(Zero-Sized Type,
ZST),用于在编译期向类型系统传递额外信息,而不占用运行时内存。它在泛型编程、生命周期管理和所有权标记中扮演关键角色。
它有以下的核心特性和作用:
零内存开销:
PhantomData<T>
本身不存储任何数据,编译后会被优化掉,因此不会增加结构体的实际内存占用。1
2
3
4
5use std::marker::PhantomData;
struct Wrapper<T> {
data: u32,
_marker: PhantomData<T>, // 不占空间
}标记未使用的泛型参数:Rust 要求泛型参数必须在结构体中被显式使用。若泛型参数未直接出现在字段中,可通过
PhantomData
标记其存在性,避免编译错误。1
2
3
4struct Resource<T> {
handle: *mut (),
_phantom: PhantomData<T>, // 标记类型 T
}声明生命周期依赖:当结构体包含原始指针(如
*const T
)时,PhantomData
可绑定生命周期,确保引用的数据有效性。1
2
3
4
5struct Slice<'a, T> {
start: *const T,
end: *const T,
_phantom: PhantomData<&'a T>, // 绑定生命周期 'a
}协变与逆变控制:通过
PhantomData<&'a T>
或PhantomData<*mut T>
等不同形式,调整类型的协变/逆变行为。1
2
3
4
5
6
7use std::marker::PhantomData;
use std::rc::Rc;
// 标记类型为 !Send 且 !Sync
struct NotThreadSafe {
_marker: PhantomData<Rc<()>>, // Rc<()> 本身是 !Send + !Sync
}