系列文章:
- Rust 原理丨聊一聊 Rust 的 Atomic 和内存顺序
- Rust 原理丨从汇编角度看原子操作
- Rust 实战丨手写一个 SpinLock
- Rust 实战丨手写一个 oneshot channel 👈 本篇
- Rust 实战丨手写一个 Arc
- Rust 原理丨操作系统并发原语
- Rust 实战丨手写一个 Mutex
- Rust 实战丨手写一个 Condvar
- Rust 实战丨手写一个 RwLock
继上篇 Rust
实战丨手写一个 SpinLock,本篇我们继续参考 Rust Atomics and Locks
一书,来实现一个 oneshot channel。
在 Go 语言中,有一句名言:
Don't communicate by sharing memory, share memory by communicating. 不要通过共享内存来通信,而要通过通信来共享内存。
讲的就是通道 channel。使用 channel
来通信,一方面可以避免共享状态的并发竞争问题,另一方面可以解耦生产者和消费者。
channel 根据生产者和消费者的数量,可以分为以下几种:
- 单生产者单消费者 (SPSC)
- 单生产者多消费者 (SPMC)
- 多生产者单消费者 (MPSC)
- 多生产者多消费者 (MPMC)
在单生产者单消费者这个分类中,有一种特殊且常用的场景,叫一次性通道(oneshot channel)。它在 SPSC 的基础上增加了额外约束:整个生命周期内只传递一次数据,传递完成后通道就失效了。
熟悉 Go 语言的读者应该对以下使用场景很熟悉,这些都是典型的 oneshot channel 应用:
1 | func demo1() { |
1 | func demo2() { |
在 Rust 社区里面,就有一个非常优秀的 oneshot
实现,在详细深入它的实现之前,我们先参考 Rust Atomics
and Locks 一书,来尝试实现一个 oneshot channel!
读完本篇你能学到什么
一次性 (oneshot) 通道的场景与优势
- 了解它与多生产者/多消费者通道的区别
- 掌握常见使用模式(如线程同步、单次结果返回)
Rust 并发核心原语的渐进式实践
UnsafeCell:内部可变性基石AtomicBool+Ordering::{Release,Acquire}:最小化同步原语MaybeUninit<T>:零成本延迟初始化
用所有权与生命周期设计零误用 API
- 将
Sender/Receiver拆分并一次性消费 - 生命周期引用 vs
Arc的权衡与替换技巧
- 将
线程挂起/唤醒机制
std::thread::park / unpark的阻塞式等待模型
类型系统层面的“防呆”手段
- 利用
PhantomData禁止跨线程误用 Drop手动回收,避免内存泄漏
- 利用
一步步优化的思考路径
- 如何发现问题 → 提出假设 → 实现 → 验证 → 再迭代
带着这些目标,跟随本文一路迭代到 v8,你将拥有一个高性能、零误用的 oneshot channel,以及一整套可迁移到其它并发场景的设计思维。
热身版 v0:基于锁的通道
我们先来实现一个万能版 channel
热热身。顾名思义,channel 分为 2 个功能,send
和 receive,其中:
send往channel一头放数据。receive从channel另外一头取数据,如果没有数据,则阻塞住,直到有数据时返回取出数据并返回。
在 Rust 中,我们可以用队列 VecQueue
来作为数据的承载,同时为了对队列访问的并发安全,我们需要使用锁
Mutex
来保护它,另外,在消费者取数据时,如果没有数据,则需要阻塞并等待唤醒(使用循环等待就太耗
CPU 了),所以我们可以使用条件变量 Condvar
来实现挂起和唤醒。
经过以上分析,我们可以定义如下的结构:
1 | use std::{ |
send 和 receive
方法也比较简单,如下所示:
1 | impl<T> Channel<T> { |
这里需要注意的是,self.queue.lock().unwrap() 返回的
b 是一个 MutextGuard,所以当执行
self.item_ready.wait(b)
的时候,在挂起当前线程的时候,会释放
b,所以这里不会一直占用锁,而导致其他线程抢不到锁。
这个版本的实现在功能上当然没有问题,但是在性能上还有非常多可以优化的地方,尤其是在锁的使用上,在高并发的情况下,锁的竞争会非常激烈。
OK,热完身后,我们开始基于这个实现,来一步步实现一个高性能的
oneshot channel!
基础版 v1:unsafe 提醒使用者
1 | pub struct Channel<T> { |
我们先来分析一下,一个 oneshot channel
的结构,需要包含哪些字段。
- 首先它可能会有 0 条数据或 1 条数据,所以很当然,数据可以用一个
Option来承载。 - 另外,
send和receive可以在不同的线程中被调用,所以我们只能用共享只读引用,而不是用mut独享可变引用,但是send和receive都需要对数据进行修改,所以我们这里就需要一个支持内部可变性的数据结构,这个时候,就用到了上篇 Rust 实战丨手写一个 SpinLock 介绍的UnsafeCell<T>,它允许在共享引用下进行内部可变性修改,是 Rust 并发原语的基石,这里不再赘述。 - 最后,我们需要一个变量来表明是否有数据,为了并发安全,这里可以用
AtomicBool,为此,我们也增加了一个is_ready的方法,用于判断数据是否已准备好。对于原子变量,我们使用一对Release和Acquire(Release确保之前的写入对其他线程可见,Acquire确保能看到之前的Release写入)来确保原子变量的跨线程可见性。
基于以上分析,我们定出了新的 Channel 结构:
1 | pub struct Channel<T> { |
对应的 send、receive 和
is_ready 实现如下:
1 | impl<T> Channel<T> { |
在这个版本中:
- 我们暂且使用
unsafe加注释的方式,来 提醒 使用者,send和receive只能被调用一次,同时,在调用receive之前,必须先使用is_ready进行数据检查。 - 对于原子变量,我们使用一对 Release 和 Acquire 来确保原子变量的跨线程可见性,具体可参考 Rust 原理丨聊一聊 Rust 的 Atomic 和内存顺序。
另外别忘了,UnsafeCell<T> 是不支持
Sync 的,所以为了我们的 Channel
可以跨线程使用,我们需要为其实现 Sync trait:
1 | // 1. Channel<T> 可以在不同的线程中被分别执行 send 和 receive,所以它的引用可以在线程中共享,所以需要实现 Sync; |
使用方法如下:
1 |
|
基础版 v2:使用 MaybeUninit 替代 Option 减少内存开销
我们先来思考一个问题:Option<T> 的内存占用是多少?
结论是::
Option<T>相比于 T,可能需要额外消耗标记位和填充位的空间。具体可参考附录:1. Option<T> 的内存占用是多少。
另外一点是,Option<T>
其实已经包含了是否存在值的信息了,它跟 ready
这个标志的作用其实重复了,有一些浪费。
在当下场景,我们可以使用另外一个数据结构来替代
Option<T> ——
MaybeUninit<T>,相比于
Option<T>,它有以下优势:
- 内存占用优化:在
Option<T>中,对于非空指针优化(Niche Optimization)的类型,None会占用额外的空间(一个字节的标签+可能的对齐填充)。而MaybeUninit<T>本身就是一个大小与T相同的未初始化内存,它没有标签,因此不会引入额外的内存开销。 - 避免初始化开销:使用
Option<T>时,在初始化时设置为None,实际上会写入一个表示None的值(即进行初始化)。而MaybeUninit<T>的uninit()不会对内存进行任何初始化,这在性能敏感的场景下可以避免不必要的初始化开销(特别是当T很大时)。 - 更灵活地控制初始化:在通道的实现中,消息可能由生产者写入,然后通过设置
ready标志来通知消费者。使用MaybeUninit允许我们延迟初始化,直到实际需要写入消息的时候。这样,在通道创建时,我们不需要为T类型的值进行任何初始化(即使是None),而是留出一块未初始化的内存,在后续由生产者写入实际的值。 - 与原子标志配合更高效:在上个版本的视线中,
ready是一个AtomicBool,用于指示消息是否就绪。在Option<T>版本中,我们需要检查Option是否为Some,同时还要检查ready标志。而使用MaybeUninit后,我们完全依赖ready标志来判断消息是否可用,避免了双重检查(因为MaybeUninit本身不携带状态,所以状态完全由ready控制)。这样,结构体的内存布局更紧凑,且访问模式更直接。 - 潜在的性能提升:由于避免了额外的标签和初始化,以及更紧凑的内存布局,可能会提高缓存利用率,从而提升性能。
MaybeUninit<T> 有以下常用方法:
| 常用方法 | 作用 | 安全级别 |
|---|---|---|
MaybeUninit::uninit() |
创建一块完全未初始化的内存 | const fn、safe |
as_mut_ptr() / as_ptr() |
取出裸指针,供外部写入或读取 | safe |
assume_init() / assume_init_read() |
告诉编译器“这里已经是一个合法的 T 了”,并返回它 |
unsafe(因为你得保证真初始化过) |
write(val) |
按位把 val 复制/移动
到这块未初始化内存;此后视为已初始化 |
unsafe |
经过上面一顿分析,我们来使用 MaybeUninit<T> 来替代
Option<T>,进一步减少内存占用和提升性能,新的
Channel 结构如下:
1 | pub struct Channel<T> { |
对应的 send 和 receive 实现更新如下:
1 | impl<T> Channel<T> { |
其中 send 中,(*self.message.get()) 从
UnsafeCell<MaybeUninit<T>> 中取出
MaybeUninit<T>,然后调用 write 方法把
message
写入这块未初始化的内存中,此后视为已初始化,并可以使用使用
assume_init_read 进行读取。
修改后,测试代码没有发生变化,我们执行之前的测试代码,发现还是可以通过的!
基础版 v3:增加动态检查提高安全性
上述版本中,我们通过 unsafe
和注释去“要求”调用者严格遵循以下约束:
send和receive最多只调用一次。receive调用之前,必须先经过is_ready的检查。
在这个版本中,我们加一下动态检查,如果调用者不按要求做事,那就直接
panic 给出告警。
在 receive 中,我们需要做 2 点保证:① 已经有数据了,②
数据只被消耗了一次。
1 | pub unsafe fn receive(&self) -> T { |
这里我们使用 swap,将 ready 从
false 转为 true,达到了 2 个目的:
- 如果返回了
false,则说明之前的ready为false,即数据没准备好。 - 如果返回了
true,则说明数据已经准备好了,这个时候,也已经将ready置为false,这样后面调用的receive也将失败。
对于
send,我们需要保证只写入一次,所以这里我们需要引入一个新的变量
in_user,表示 send 是否已经使用了:
1 | pub struct Channel<T> { |
在 send 中,我们依旧使用 swap,来将
in_use 从转为 true,如果返回
true,则说明之前已经执行过 send
了,这个时候将执行 panic 进行告警。
1 | /// Panics when trying to send more than one message. |
通过上述的 2 个优化,我们的 Channel
又“安全”了一丢丢!
基础版 v4:实现 Drop 自动清理无用内存
因为我们使用了
MaybeUninit<T>,所以我们需要自己管理 T
的内存管理,但在上述的实现中,可能存在一种情况,导致内存得不到释放:我们只执行了
send,但直到 Channel
超过作用域的时候,都没有被 receive。
为此,我们可以为 Channel 实现 Drop
trait,当有数据的时候,对MaybeUninit<T>
进行内存释放:
1 | impl<T> Drop for Channel<T> { |
安全非阻塞版 v5:提供安全方法,减少使用者误用
在这个版本中,我们来解决前面实现的最大问题:方法是不安全的,严重依赖调用者的自觉性,没有充分发挥 Rust 强大编译器的检查能力。
回顾我们的需求:我们要实现的是一个
oneshot channel,即只能调用一次 send 和
receive。
第一个问题是:如何利用 Rust 天然的编译器检查能力来约束这一点呢?很明显,就是所有权机制!什么东西只能执行一次呢?消耗所有权的东西!
1 | // 对于第一个参数为 self 的方法,执行时,会转移所有权,执行后,原变量就不能再用了,因为所有权已经转移了。 |
好,那第二个问题就来了:self
方法只能调用一次,但很明显我们总共需要 2 次的调用(send 和
receive),所以这里我们可以将 Channel
进行拆开,分成 Sender 和 Receiver。
那第三个问题也就随之而来了,Sender 和
Receiver 都需要持有
Channel,并且可能处于不同的线程,这里我们可以先用
Arc 来对 Channel 进行引用。
解决了上述 3 个问题,我们可以梳理新的数据结构:
1 | pub struct Sender<T> { |
Channel中移除了in_use属性,因为我们已经有self做所有权检查了,不再需要in_use来避免重复调用send了。- 新增了
Sender<T>和Receiver<T>两个结构,它们都各自持有了一个Arc<Channel>。
对应的 send 和 receive 方法当然也就转移到
Sender<T> 和 Receiver<T>
身上了,实现也和之前基本一致:
1 | impl<T> Sender<T> { |
修改了结构了,我们需要修改对应的测试代码:
1 |
|
在最新的测试代码中,我们已经不再需要 unsafe
代码了!这对于使用者来说,就非常友好了!
而且这个时候,你如果尝试执行多次 send 和
receive 的时候,编译器就会报错了!
不过它还是有 2 个缺点:
Arc的复制还是有一些开销的。- 我们依旧依赖使用者提前用
is_ready来检查,否则直接调用receive就有可能会panic。
我们先来解决第 1 个问题。
安全非阻塞版 v6:使用生命周期加引用,避免 Arc 的复制开销
为了避免 Arc 的开销,我们需要在 Sender<T> 和
Receiver<T> 中持有 Channel<T>
的引用,而引用的对象的生命周期是不确定的,所以我们需要加入生命周期标注,来告诉编译器我们的引用是逻辑自洽的。
新的结构和构造函数如下:
1 | pub struct Sender<'a, T> { |
在这个版本的实现中,我们新增了 split 方法:
- 其中参数
&'a mut self表明它是一个独占引用,即channel.split()不会有并发问题。 - 第一行代码
*self = Self::new()我们对原有的Channel进行重置,保证拆分之前通道里绝对没有残留数据,避免旧消息被下一对Sender/Receiver误使用。 'a直接来自于&'a mut self,保证两端把手绝不会比原始Channel活得更久。
新的测试代码如下:
1 |
|
安全阻塞版 v7:去掉 is_ready 完全避免使用者误调用
截止目前的实现版本中,我们还是依赖使用者在执行 receive
之前先执行 is_ready
进行数据检查,还是存在一定的误操作性。现在我们来实现一个完全阻塞的版本,来完全避免这个情况。
我们需要做几件事情:
- 去掉
is_ready方法; - 在
receive中,根据ready判断是否存在数据:- 如果存在,则直接取出数据并返回;
- 如果不存在,则需要先挂起当前线程,等待唤醒(直接 CPU 循环检查肯定可以,但不够优雅!咱不干!);
- 在
send中,放入数据后,尝试唤醒可能处于挂起中的线程。
那现在最重要的一个问题是:如何唤醒处于挂起中的线程?更进一步,唤醒哪个线程?
这里其实是说不定的,因为 Sender 和 Receiver
都可能被放入任何一个线程中,不过在 Rust Atomics
and Locks 书中,作者假定了 Receiver 会固定在调用
split 的那个线程。
笔者认为这个假设是简单且有效的,回顾一下我们前面举的 Go 语言的 2 个例子:
1 | func demo1() { |
1 | func demo2() { |
在这两个最常见的 oneshot channel
的例子中,Receiver 就是处于调用 split
的线程中。所以我们可以基于这个假设来实现这个版本。
先回顾下 Thread 2 个最核心的方法:
Thread.park(thread): 挂起线程,等待唤醒。thread.unpark(): 唤醒线程。
首先我们需要在 Sender 中保存待唤醒的线程:
1 | pub struct Sender<'a, T> { |
在 split() 的时候,我们需要获取当前线程并保存在
Sender 中:
1 | pub fn split<'a>(&'a mut self) -> (Sender<'a, T>, Receiver<'a, T>) { |
在 recevie
的时候,如果没有数据,我们就可以挂起当前线程,等待唤醒:
1 | impl<T> Receiver<'_, T> { |
在 send 完数据后,唤醒可能挂起的线程:
1 | impl<T> Sender<'_, T> { |
最后删除之前的 is_ready
方法,然后更新我们的测试代码:
1 |
|
最终版 v8:使用 PhantomData 来保证 Receiver 处于 split() 线程
是不是觉得,上述实现已经完美无瑕了!其实不然,我们虽然假设了
Receiver 处于调用 split()
的线程中,但是还是无法阻止使用者将 Receiver
转移到其他线程。
再次回顾下我们现在的 Channel 和
Receiver:
1 | pub struct Receiver<'a, T> { |
我们为 Channel<T> 实现了 Sync
trait,而标准库中有这 2 行代码:
1 | impl<T: ?Sized + Sync> Sync for &T {} |
所以 &Channel<T> 实现了 Sync/Send
trait,而 Receiver 只持有了一个
&'a Channel<T>,所以它也是 Send
的!
所以 Receiver
是可以被转移到其他线程的,即下述的测试代码在编译上也是通过的:
1 |
|
但是我们执行后会发现,receiver.receive()
会被永久阻塞住,这是因为 sender.send(1) 只会唤醒执行
split() 的线程。
为了避免这种情况的发生,我们需要强行防止 Receiver 实现
Send trait!
怎么办呢?我们需要做到 2 件事情:
- 让
Receiver持有一个非Sync的属性; - 这个属性除了标记没有其他作用,最好不要占用任何的资源。
这里我们介绍一位新朋友:PhantomData:
PhantomData是一个零大小类型(Zero-Sized Type, ZST),用于在编译期向类型系统传递额外信息,而不占用运行时内存。我们可以用它在包一个
!Send的类型,这样Receiver就是!Send的了。关于PhantomData的更多介绍,可以参考附录 2:PhantomData。
在介绍完 PhantomData 后,我们就可以使用它来防止
Receiver 实现 Send trait 了:
1 | pub struct Receiver<'a, T> { |
其中 *const() 是 !Send 的,所以我们的
Receiver 再也不会被转移到其他线程了,而 send
是要求 self,所以即便是 Sync
的也无所谓了,因为无法通过引用来执行 receive() 方法。
现在我们可以再次执行上面的测试代码(强行将 Receiver 移动到其他线程中),将会得到以下的报错:
1 | error[E0277]: `*const ()` cannot be sent between threads safely |
到这里,通过 8
个版本,我们一步步实现了一个高性能、低内存占用且安全可用的
oneshot channel 了!
总结
至此,通过 8 个小版本,我们不仅手写了一个 高性能、安全友好的 oneshot channel,还更进一步体验了 Rust 在并发领域 “以类型系统驱动正确性” 的威力。我们来做一个简单的小结。
关键收获:
| 版本 | 新增能力 | 解决了什么问题 |
|---|---|---|
| v0 | Mutex + Condvar 通用通道 |
打开话题、对比后续无锁方案 |
| v1 | UnsafeCell<Option<T>> +
AtomicBool |
去锁化、最小可行一次性通道 |
| v2 | 替换为 MaybeUninit<T> |
节省内存&避免双状态检查 |
| v3 | 运行时检查 (swap) |
阻止未准备/二次调用导致 UB |
| v4 | Drop 清理 |
防止“只 send 不 recv”泄漏 |
| v5 | Sender / Receiver 所有权 API |
编译期保证“仅调用一次” |
| v6 | 生命周期引用替换 Arc |
消除引用计数开销 |
| v7 | park / unpark 阻塞模型 |
使用者不再需要轮询 is_ready |
| v8 | PhantomData 防跨线程误用 |
类型系统彻底封死错误用法 |
核心记忆点:
- 内部可变性:
UnsafeCell是所有并发原语的基石。 - 延迟初始化:
MaybeUninit<T>+ “就绪标志” 是零成本组合。 - Release / Acquire:最轻量的跨线程可见性保障。
- 所有权设计 API:让编译器替你兜底逻辑约束。
- PhantomData:零大小但能影响
Send/Sync的类型级标记。 - 迭代思路:先跑通,再收口安全性与性能,最后用类型系统“防呆”。
完整的代码可以参考:conutils-oneshot。
oneshot crate 浅探
附录
1. Option<T> 的内存占用是多少?
在 Rust 中,Option<T>
类型占用的内存,取决于泛型参数 T
的类型特性。具体可分为两种情况:
- 当 T
是非指针类型(如基本类型、结构体等)时,
Option<T>需要额外的空间存储 Some 或 None 的标签,此时:- 内存布局:包含一个 1 字节的标签(标识 Some 或 None)和
T类型的数据空间(可能包含对齐填充)。 - 即使为
None,仍需保留T所需的内存空间(含填充),以保障枚举值大小统一。如Option<i32>占用 8 字节(1 字节标签 + 4 字节 i32 + 3 字节填充)。
- 内存布局:包含一个 1 字节的标签(标识 Some 或 None)和
- 当 T 是不可为空的指针类型(如
Box<T>、&T、&mut T)时,Rust
编译器会启用空指针优化(Niche
Optimization)。即利用指针不能为 0 的特性,将
None标识为全零位模式(0x00),而Some(ptr)存储实际指针地址,此时无需额外标签。这个时候,None和Some(T)不占用任何的额外空间,大小与 T 相同。
我们可以写个程序来简单验证一下:
1 | fn test_option() { |
在笔者的电脑下,输出如下:
1 | 基本类型: |
通过输出我们可以观察到经过空指针优化,Option<Box<i32>>
的 None 和 Some 都只占 8
字节(与 Box<i32> 相同),同时通过为
None 复用类型的无效位模式(如
0x0)消除枚举标签,实现零成本抽象。
2. PhantomData
Rust 中的 PhantomData 是一个零大小类型(Zero-Sized Type,
ZST),用于在编译期向类型系统传递额外信息,而不占用运行时内存。它在泛型编程、生命周期管理和所有权标记中扮演关键角色。
它有以下的核心特性和作用:
零内存开销:
PhantomData<T>本身不存储任何数据,编译后会被优化掉,因此不会增加结构体的实际内存占用。1
2
3
4
5use std::marker::PhantomData;
struct Wrapper<T> {
data: u32,
_marker: PhantomData<T>, // 不占空间
}标记未使用的泛型参数:Rust 要求泛型参数必须在结构体中被显式使用。若泛型参数未直接出现在字段中,可通过
PhantomData标记其存在性,避免编译错误。1
2
3
4struct Resource<T> {
handle: *mut (),
_phantom: PhantomData<T>, // 标记类型 T
}声明生命周期依赖:当结构体包含原始指针(如
*const T)时,PhantomData可绑定生命周期,确保引用的数据有效性。1
2
3
4
5struct Slice<'a, T> {
start: *const T,
end: *const T,
_phantom: PhantomData<&'a T>, // 绑定生命周期 'a
}协变与逆变控制:通过
PhantomData<&'a T>或PhantomData<*mut T>等不同形式,调整类型的协变/逆变行为。1
2
3
4
5
6
7use std::marker::PhantomData;
use std::rc::Rc;
// 标记类型为 !Send 且 !Sync
struct NotThreadSafe {
_marker: PhantomData<Rc<()>>, // Rc<()> 本身是 !Send + !Sync
}