系列文章:

继上篇 Rust 实战丨手写一个 SpinLock,本篇我们继续参考 Rust Atomics and Locks 一书,来实现一个 oneshot channel

在 Go 语言中,有一句名言:

Don't communicate by sharing memory, share memory by communicating. 不要通过共享内存来通信,而要通过通信来共享内存。

讲的就是通道 channel。使用 channel 来通信,一方面可以避免共享状态的并发竞争问题,另一方面可以解耦生产者和消费者。

channel 根据生产者和消费者的数量,可以分为以下几种:

  1. 单生产者单消费者 (SPSC)
  2. 单生产者多消费者 (SPMC)
  3. 多生产者单消费者 (MPSC)
  4. 多生产者多消费者 (MPMC)

单生产者单消费者这个分类中,有一种特殊且常用的场景,叫一次性通道(oneshot channel)。它在 SPSC 的基础上增加了额外约束:整个生命周期内只传递一次数据,传递完成后通道就失效了。

熟悉 Go 语言的读者应该对以下使用场景很熟悉,这些都是典型的 oneshot channel 应用:

1
2
3
4
5
6
7
8
9
func demo1() {
done := make(chan struct{})
go func() {
// ... do something
close(done)
}()

<-done
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func demo2() {
oneShot := make(chan string, 1)
go func() {
oneShot <- generateText()
}()

text := <-oneShot
doSthWithText(text)
}

func generateText() string {
// ...
}

func doSthWithText(text string) {
// ...
}

在 Rust 社区里面,就有一个非常优秀的 oneshot 实现,在详细深入它的实现之前,我们先参考 Rust Atomics and Locks 一书,来尝试实现一个 oneshot channel!

读完本篇你能学到什么

  1. 一次性 (oneshot) 通道的场景与优势

    • 了解它与多生产者/多消费者通道的区别
    • 掌握常见使用模式(如线程同步、单次结果返回)
  2. Rust 并发核心原语的渐进式实践

    • UnsafeCell:内部可变性基石
    • AtomicBool + Ordering::{Release,Acquire}:最小化同步原语
    • MaybeUninit<T>:零成本延迟初始化
  3. 用所有权与生命周期设计零误用 API

    • Sender / Receiver 拆分并一次性消费
    • 生命周期引用 vs Arc 的权衡与替换技巧
  4. 线程挂起/唤醒机制

    • std::thread::park / unpark 的阻塞式等待模型
  5. 类型系统层面的“防呆”手段

    • 利用 PhantomData 禁止跨线程误用
    • Drop 手动回收,避免内存泄漏
  6. 一步步优化的思考路径

    • 如何发现问题 → 提出假设 → 实现 → 验证 → 再迭代

带着这些目标,跟随本文一路迭代到 v8,你将拥有一个高性能、零误用的 oneshot channel,以及一整套可迁移到其它并发场景的设计思维。

热身版 v0:基于锁的通道

我们先来实现一个万能版 channel 热热身。顾名思义,channel 分为 2 个功能,sendreceive,其中:

  • sendchannel 一头放数据。
  • receivechannel 另外一头取数据,如果没有数据,则阻塞住,直到有数据时返回取出数据并返回。

在 Rust 中,我们可以用队列 VecQueue 来作为数据的承载,同时为了对队列访问的并发安全,我们需要使用锁 Mutex 来保护它,另外,在消费者取数据时,如果没有数据,则需要阻塞并等待唤醒(使用循环等待就太耗 CPU 了),所以我们可以使用条件变量 Condvar 来实现挂起和唤醒。

经过以上分析,我们可以定义如下的结构:

1
2
3
4
5
6
7
8
9
use std::{
collections::VecDeque,
sync::{Condvar, Mutex},
};

pub struct Channel<T> {
queue: Mutex<VecDeque<T>>,
item_ready: Condvar,
}

sendreceive 方法也比较简单,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
impl<T> Channel<T> {
pub fn new() -> Self {
Self {
queue: Mutex::new(VecDeque::new()),
item_ready: Condvar::new(),
}
}

pub fn send(&self, message: T) {
// 上锁并从队列后面插入数据
self.queue.lock().unwrap().push_back(message);
// 唤醒一个等待数据的线程
self.item_ready.notify_one();
}

pub fn receive(&self) -> T {
// 抢占队列
let mut b = self.queue.lock().unwrap();
loop {
// 尝试从队列中获取数据,如果获取到,则直接返回(并释放锁)
if let Some(message) = b.pop_front() {
return message;
}
// 没有数据,则挂起当前线程(同时释放锁)
b = self.item_ready.wait(b).unwrap();
}
}
}

这里需要注意的是,self.queue.lock().unwrap() 返回的 b 是一个 MutextGuard,所以当执行 self.item_ready.wait(b) 的时候,在挂起当前线程的时候,会释放 b,所以这里不会一直占用锁,而导致其他线程抢不到锁。

这个版本的实现在功能上当然没有问题,但是在性能上还有非常多可以优化的地方,尤其是在锁的使用上,在高并发的情况下,锁的竞争会非常激烈。

OK,热完身后,我们开始基于这个实现,来一步步实现一个高性能的 oneshot channel

基础版 v1:unsafe 提醒使用者

1
2
3
4
pub struct Channel<T> {
queue: Mutex<VecDeque<T>>,
item_ready: Condvar,
}

我们先来分析一下,一个 oneshot channel 的结构,需要包含哪些字段。

  1. 首先它可能会有 0 条数据或 1 条数据,所以很当然,数据可以用一个 Option 来承载。
  2. 另外,sendreceive 可以在不同的线程中被调用,所以我们只能用共享只读引用,而不是用 mut 独享可变引用,但是 sendreceive 都需要对数据进行修改,所以我们这里就需要一个支持内部可变性的数据结构,这个时候,就用到了上篇 Rust 实战丨手写一个 SpinLock 介绍的 UnsafeCell<T>,它允许在共享引用下进行内部可变性修改,是 Rust 并发原语的基石,这里不再赘述。
  3. 最后,我们需要一个变量来表明是否有数据,为了并发安全,这里可以用 AtomicBool,为此,我们也增加了一个 is_ready 的方法,用于判断数据是否已准备好。对于原子变量,我们使用一对 ReleaseAcquireRelease 确保之前的写入对其他线程可见,Acquire 确保能看到之前的 Release 写入)来确保原子变量的跨线程可见性。

基于以上分析,我们定出了新的 Channel 结构:

1
2
3
4
pub struct Channel<T> {
message: UnsafeCell<Option<T>>,
ready: AtomicBool,
}

对应的 sendreceiveis_ready 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
impl<T> Channel<T> {
pub fn new() -> Self {
Self {
message: UnsafeCell::new(None),
ready: AtomicBool::new(false),
}
}

/// Safety: Only call this once!
pub unsafe fn send(&self, message: T) {
unsafe {
self.message.get().write(Some(message));
}
self.ready.store(true, Ordering::Release);
}

pub fn is_ready(&self) -> bool {
self.ready.load(Ordering::Acquire)
}

/// Safety: Only call this once,
/// and only after is_ready() returns true!
pub unsafe fn receive(&self) -> T {
unsafe { self.message.get().read().unwrap() }
}
}

在这个版本中:

  1. 我们暂且使用 unsafe 加注释的方式,来 提醒 使用者,sendreceive 只能被调用一次,同时,在调用 receive 之前,必须先使用 is_ready 进行数据检查。
  2. 对于原子变量,我们使用一对 Release 和 Acquire 来确保原子变量的跨线程可见性,具体可参考 Rust 原理丨聊一聊 Rust 的 Atomic 和内存顺序

另外别忘了,UnsafeCell<T> 是不支持 Sync 的,所以为了我们的 Channel 可以跨线程使用,我们需要为其实现 Sync trait:

1
2
3
// 1. Channel<T> 可以在不同的线程中被分别执行 send 和 receive,所以它的引用可以在线程中共享,所以需要实现 Sync;
// 2. T 由线程 1 生成并放入 Channel,然后由线程 2 从 Channel 中获取,所以它需要从一个线程转移到另外一个线程,所以需要实现 Send。
unsafe impl<T> Sync for Channel<T> where T: Send {}

使用方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#[test]
fn one_thread_should_work() {
let channel = Channel::new();
unsafe {
channel.send(1);
};
if channel.is_ready() {
let msg = unsafe { channel.receive() };
assert_eq!(msg, 1);
}
}

#[test]
fn cross_thread_should_work() {
let channel = Channel::new();
thread::scope(|s| {
s.spawn(|| {
sleep(Duration::from_millis(10));
unsafe {
channel.send(1);
};
});

loop {
if channel.is_ready() {
let res = unsafe { channel.receive() };
assert_eq!(res, 1);
break;
}
}
});
}

基础版 v2:使用 MaybeUninit 替代 Option 减少内存开销

我们先来思考一个问题:Option<T> 的内存占用是多少?

结论是::Option<T> 相比于 T,可能需要额外消耗标记位和填充位的空间。具体可参考附录:1. Option<T> 的内存占用是多少

另外一点是,Option<T> 其实已经包含了是否存在值的信息了,它跟 ready 这个标志的作用其实重复了,有一些浪费。

在当下场景,我们可以使用另外一个数据结构来替代 Option<T> —— MaybeUninit<T>,相比于 Option<T>,它有以下优势:

  1. 内存占用优化:在 Option<T> 中,对于非空指针优化(Niche Optimization)的类型,None 会占用额外的空间(一个字节的标签+可能的对齐填充)。而 MaybeUninit<T> 本身就是一个大小与 T 相同的未初始化内存,它没有标签,因此不会引入额外的内存开销。
  2. 避免初始化开销:使用 Option<T> 时,在初始化时设置为 None,实际上会写入一个表示 None 的值(即进行初始化)。而 MaybeUninit<T>uninit() 不会对内存进行任何初始化,这在性能敏感的场景下可以避免不必要的初始化开销(特别是当 T 很大时)。
  3. 更灵活地控制初始化:在通道的实现中,消息可能由生产者写入,然后通过设置 ready 标志来通知消费者。使用 MaybeUninit 允许我们延迟初始化,直到实际需要写入消息的时候。这样,在通道创建时,我们不需要为 T 类型的值进行任何初始化(即使是 None),而是留出一块未初始化的内存,在后续由生产者写入实际的值。
  4. 与原子标志配合更高效:在上个版本的视线中,ready 是一个 AtomicBool,用于指示消息是否就绪。在 Option<T> 版本中,我们需要检查 Option 是否为 Some,同时还要检查 ready 标志。而使用 MaybeUninit 后,我们完全依赖 ready 标志来判断消息是否可用,避免了双重检查(因为 MaybeUninit 本身不携带状态,所以状态完全由 ready 控制)。这样,结构体的内存布局更紧凑,且访问模式更直接。
  5. 潜在的性能提升:由于避免了额外的标签和初始化,以及更紧凑的内存布局,可能会提高缓存利用率,从而提升性能。

MaybeUninit<T> 有以下常用方法:

常用方法 作用 安全级别
MaybeUninit::uninit() 创建一块完全未初始化的内存 const fnsafe
as_mut_ptr() / as_ptr() 取出裸指针,供外部写入或读取 safe
assume_init() / assume_init_read() 告诉编译器“这里已经是一个合法的 T 了”,并返回它 unsafe(因为你得保证真初始化过)
write(val) 按位把 val 复制/移动 到这块未初始化内存;此后视为已初始化 unsafe

经过上面一顿分析,我们来使用 MaybeUninit<T> 来替代 Option<T>,进一步减少内存占用和提升性能,新的 Channel 结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
pub struct Channel<T> {
message: UnsafeCell<MaybeUninit<T>>,
ready: AtomicBool,
}

impl<T> Channel<T> {
pub fn new() -> Self {
Self {
// 创建一块完全未初始化的内存,先占位
message: UnsafeCell::new(MaybeUninit::uninit()),
ready: AtomicBool::new(false),
}
}
}

对应的 sendreceive 实现更新如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
impl<T> Channel<T> {
/// Safety: Only call this once!
pub unsafe fn send(&self, message: T) {
unsafe {
// 从 UnsafeCell<MaybeUninit<T>> 中取出 MaybeUninit<T> 并写入数据。
(*self.message.get()).write(message);
}
self.ready.store(true, Ordering::Release);
}

/// Safety: Only call this once,
/// and only after is_ready() returns true!
pub unsafe fn receive(&self) -> T {
// 从 UnsafeCell<MaybeUninit<T>> 中取出 MaybeUninit<T> 并读出数据。
unsafe { (*self.message.get()).assume_init_read() }
}
}

其中 send 中,(*self.message.get())UnsafeCell<MaybeUninit<T>> 中取出 MaybeUninit<T>,然后调用 write 方法把 message 写入这块未初始化的内存中,此后视为已初始化,并可以使用使用 assume_init_read 进行读取。

修改后,测试代码没有发生变化,我们执行之前的测试代码,发现还是可以通过的!

基础版 v3:增加动态检查提高安全性

上述版本中,我们通过 unsafe 和注释去“要求”调用者严格遵循以下约束:

  1. sendreceive 最多只调用一次。
  2. receive 调用之前,必须先经过 is_ready 的检查。

在这个版本中,我们加一下动态检查,如果调用者不按要求做事,那就直接 panic 给出告警。

receive 中,我们需要做 2 点保证:① 已经有数据了,② 数据只被消耗了一次。

1
2
3
4
5
6
pub unsafe fn receive(&self) -> T {
if !self.ready.swap(false, Ordering::Acquire) {
panic!("no message available!")
}
unsafe { (*self.message.get()).assume_init_read() }
}

这里我们使用 swap,将 readyfalse 转为 true,达到了 2 个目的:

  1. 如果返回了 false,则说明之前的 readyfalse,即数据没准备好。
  2. 如果返回了 true,则说明数据已经准备好了,这个时候,也已经将 ready 置为 false,这样后面调用的 receive 也将失败。

对于 send,我们需要保证只写入一次,所以这里我们需要引入一个新的变量 in_user,表示 send 是否已经使用了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
pub struct Channel<T> {
message: UnsafeCell<MaybeUninit<T>>,
in_use: AtomicBool, // 新变量,表示 send 是否已经使用了。
ready: AtomicBool,
}

impl<T> Channel<T> {
pub fn new() -> Self {
Self {
message: UnsafeCell::new(MaybeUninit::uninit()),
in_use: AtomicBool::new(false),
ready: AtomicBool::new(false),
}
}
}

send 中,我们依旧使用 swap,来将 in_use 从转为 true,如果返回 true,则说明之前已经执行过 send 了,这个时候将执行 panic 进行告警。

1
2
3
4
5
6
7
8
9
10
/// Panics when trying to send more than one message.
pub unsafe fn send(&self, message: T) {
if self.in_use.swap(true, Ordering::Relaxed) {
panic!("can't send more than one message")
}
unsafe {
(*self.message.get()).write(message);
}
self.ready.store(true, Ordering::Release);
}

通过上述的 2 个优化,我们的 Channel 又“安全”了一丢丢!

基础版 v4:实现 Drop 自动清理无用内存

因为我们使用了 MaybeUninit<T>,所以我们需要自己管理 T 的内存管理,但在上述的实现中,可能存在一种情况,导致内存得不到释放:我们只执行了 send,但直到 Channel 超过作用域的时候,都没有被 receive

为此,我们可以为 Channel 实现 Drop trait,当有数据的时候,对MaybeUninit<T> 进行内存释放:

1
2
3
4
5
6
7
8
9
impl<T> Drop for Channel<T> {
fn drop(&mut self) {
if *self.ready.get_mut() {
unsafe {
self.message.get_mut().assume_init_drop();
}
}
}
}

安全非阻塞版 v5:提供安全方法,减少使用者误用

在这个版本中,我们来解决前面实现的最大问题:方法是不安全的,严重依赖调用者的自觉性,没有充分发挥 Rust 强大编译器的检查能力

回顾我们的需求:我们要实现的是一个 oneshot channel,即只能调用一次 sendreceive

第一个问题是:如何利用 Rust 天然的编译器检查能力来约束这一点呢?很明显,就是所有权机制!什么东西只能执行一次呢?消耗所有权的东西!

1
2
// 对于第一个参数为 self 的方法,执行时,会转移所有权,执行后,原变量就不能再用了,因为所有权已经转移了。
fn do(self) {}

好,那第二个问题就来了:self 方法只能调用一次,但很明显我们总共需要 2 次的调用(sendreceive),所以这里我们可以将 Channel 进行拆开,分成 SenderReceiver

那第三个问题也就随之而来了,SenderReceiver 都需要持有 Channel,并且可能处于不同的线程,这里我们可以先用 Arc 来对 Channel 进行引用。

解决了上述 3 个问题,我们可以梳理新的数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
pub struct Sender<T> {
channel: Arc<Channel<T>>,
}

pub struct Receiver<T> {
channel: Arc<Channel<T>>,
}

struct Channel<T> {
message: UnsafeCell<MaybeUninit<T>>,
ready: AtomicBool,
}

pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let a = Arc::new(Channel {
message: UnsafeCell::new(MaybeUninit::uninit()),
ready: AtomicBool::new(false),
});
(Sender { channel: a.clone() }, Receiver { channel: a })
}
  1. Channel 中移除了 in_use 属性,因为我们已经有 self 做所有权检查了,不再需要 in_use 来避免重复调用 send 了。
  2. 新增了 Sender<T>Receiver<T> 两个结构,它们都各自持有了一个 Arc<Channel>

对应的 sendreceive 方法当然也就转移到 Sender<T>Receiver<T> 身上了,实现也和之前基本一致:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
impl<T> Sender<T> {
pub fn send(self, messgae: T) {
unsafe { (*self.channel.message.get()).write(messgae) };
self.channel.ready.store(true, Ordering::Release);
}
}

impl<T> Receiver<T> {
pub fn is_ready(&self) -> bool {
self.channel.ready.load(Ordering::Relaxed)
}

/// Safety: only after is_ready() returns true!
pub fn receive(self) -> T {
if !self.channel.ready.swap(false, Ordering::Acquire) {
panic!("no message available!");
}
unsafe { (*self.channel.message.get()).assume_init_read() }
}
}

// Drop 没变

修改了结构了,我们需要修改对应的测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#[test]
fn one_thread_should_work() {
let (sender, receiver) = channel();
sender.send(1);
if receiver.is_ready() {
let msg = receiver.receive();
assert_eq!(msg, 1);
}
}

#[test]
fn cross_thread_should_work() {
let (sender, receiver) = channel();
thread::scope(|s| {
s.spawn(|| {
sleep(Duration::from_millis(10));
sender.send(1); // 没有 unsafe 了!
});

loop {
if receiver.is_ready() {
let res = receiver.receive(); // 没有 unsafe 了!
assert_eq!(res, 1);
break;
}
}
});
}

在最新的测试代码中,我们已经不再需要 unsafe 代码了!这对于使用者来说,就非常友好了!

而且这个时候,你如果尝试执行多次 sendreceive 的时候,编译器就会报错了!

不过它还是有 2 个缺点:

  1. Arc 的复制还是有一些开销的。
  2. 我们依旧依赖使用者提前用 is_ready 来检查,否则直接调用 receive 就有可能会 panic

我们先来解决第 1 个问题。

安全非阻塞版 v6:使用生命周期加引用,避免 Arc 的复制开销

为了避免 Arc 的开销,我们需要在 Sender<T>Receiver<T> 中持有 Channel<T> 的引用,而引用的对象的生命周期是不确定的,所以我们需要加入生命周期标注,来告诉编译器我们的引用是逻辑自洽的。

新的结构和构造函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
pub struct Sender<'a, T> {
channel: &'a Channel<T>, // 使用引用替代 Arc,并加入生命周期标注
}

pub struct Receiver<'a, T> {
channel: &'a Channel<T>, // 使用引用替代 Arc,并加入生命周期标注
}

pub struct Channel<T> {
message: UnsafeCell<MaybeUninit<T>>,
ready: AtomicBool,
}

impl<T> Channel<T> {
pub const fn new() -> Self {
Self {
message: UnsafeCell::new(MaybeUninit::uninit()),
ready: AtomicBool::new(false),
}
}

pub fn split<'a>(&'a mut self) -> (Sender<'a, T>, Receiver<'a, T>) {
*self = Self::new();
(Sender { channel: self }, Receiver { channel: self })
}
}

// Drop 没变

在这个版本的实现中,我们新增了 split 方法:

  1. 其中参数 &'a mut self 表明它是一个独占引用,即 channel.split() 不会有并发问题。
  2. 第一行代码 *self = Self::new() 我们对原有的 Channel 进行重置,保证拆分之前通道里绝对没有残留数据,避免旧消息被下一对 Sender/Receiver 误使用。
  3. 'a 直接来自于 &'a mut self,保证两端把手绝不会比原始 Channel 活得更久

新的测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#[test]
fn one_thread_should_work() {
let mut channel = Channel::new();
let (sender, receiver) = channel.split();
sender.send(1);
if receiver.is_ready() {
let msg = receiver.receive();
assert_eq!(msg, 1);
}
}

#[test]
fn cross_thread_should_work() {
let mut channel = Channel::new();
let (sender, receiver) = channel.split();
thread::scope(|s| {
s.spawn(|| {
sleep(Duration::from_millis(100));
sender.send(1);
});

while !receiver.is_ready() {}
assert_eq!(receiver.receive(), 1);
});
}

安全阻塞版 v7:去掉 is_ready 完全避免使用者误调用

截止目前的实现版本中,我们还是依赖使用者在执行 receive 之前先执行 is_ready 进行数据检查,还是存在一定的误操作性。现在我们来实现一个完全阻塞的版本,来完全避免这个情况。

我们需要做几件事情:

  1. 去掉 is_ready 方法;
  2. receive 中,根据 ready 判断是否存在数据:
    1. 如果存在,则直接取出数据并返回;
    2. 如果不存在,则需要先挂起当前线程,等待唤醒(直接 CPU 循环检查肯定可以,但不够优雅!咱不干!);
  3. send 中,放入数据后,尝试唤醒可能处于挂起中的线程。

那现在最重要的一个问题是:如何唤醒处于挂起中的线程?更进一步,唤醒哪个线程?

这里其实是说不定的,因为 SenderReceiver 都可能被放入任何一个线程中,不过在 Rust Atomics and Locks 书中,作者假定了 Receiver 会固定在调用 split 的那个线程。

笔者认为这个假设是简单且有效的,回顾一下我们前面举的 Go 语言的 2 个例子:

1
2
3
4
5
6
7
8
9
func demo1() {
done := make(chan struct{}) // 类似于 split
go func() {
// ... do something
close(done)
}()

<-done // receiver
}
1
2
3
4
5
6
7
8
9
func demo2() {
oneShot := make(chan string, 1) // 类似于 split
go func() {
oneShot <- generateText()
}()

text := <-oneShot // receiver
doSthWithText(text)
}

在这两个最常见的 oneshot channel 的例子中,Receiver 就是处于调用 split 的线程中。所以我们可以基于这个假设来实现这个版本。

先回顾下 Thread 2 个最核心的方法:

  • Thread.park(thread): 挂起线程,等待唤醒。
  • thread.unpark(): 唤醒线程。

首先我们需要在 Sender 中保存待唤醒的线程:

1
2
3
4
pub struct Sender<'a, T> {
channel: &'a Channel<T>,
receiving_thread: Thread,
}

split() 的时候,我们需要获取当前线程并保存在 Sender 中:

1
2
3
4
5
6
7
8
9
10
11
pub fn split<'a>(&'a mut self) -> (Sender<'a, T>, Receiver<'a, T>) {
*self = Self::new();
(
Sender {
channel: self,
// 获取当前线程。记住!这里我们假设了 receiver 会固定在 split 的线程中!
receiving_thread: thread::current(),
},
Receiver { channel: self },
)
}

recevie 的时候,如果没有数据,我们就可以挂起当前线程,等待唤醒:

1
2
3
4
5
6
7
8
impl<T> Receiver<'_, T> {
pub fn receive(self) -> T {
while !self.channel.ready.swap(false, Ordering::Acquire) {
thread::park(); // 挂起当前线程,即 thread::current() 线程
}
unsafe { (*self.channel.message.get()).assume_init_read() }
}
}

send 完数据后,唤醒可能挂起的线程:

1
2
3
4
5
6
7
impl<T> Sender<'_, T> {
pub fn send(self, messgae: T) {
unsafe { (*self.channel.message.get()).write(messgae) };
self.channel.ready.store(true, Ordering::Release);
Thread::unpark(&self.receiving_thread); // 唤醒 receiver 线程
}
}

最后删除之前的 is_ready 方法,然后更新我们的测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#[test]
fn one_thread_should_work() {
let mut channel = Channel::new();
let (sender, receiver) = channel.split();
sender.send(1);
let msg = receiver.receive();
assert_eq!(msg, 1);
}

#[test]
fn cross_thread_should_work() {
let mut channel = Channel::new();
let (sender, receiver) = channel.split();
thread::scope(|s| {
s.spawn(|| {
sleep(Duration::from_millis(100));
sender.send(1);
});
assert_eq!(receiver.receive(), 1); // 不再需要检查 is_ready,这里会阻塞一直直到有数据到来
});
}

最终版 v8:使用 PhantomData 来保证 Receiver 处于 split() 线程

是不是觉得,上述实现已经完美无瑕了!其实不然,我们虽然假设了 Receiver 处于调用 split() 的线程中,但是还是无法阻止使用者将 Receiver 转移到其他线程。

再次回顾下我们现在的 ChannelReceiver

1
2
3
4
5
6
7
8
9
10
pub struct Receiver<'a, T> {
channel: &'a Channel<T>,
}

pub struct Channel<T> {
message: UnsafeCell<MaybeUninit<T>>,
ready: AtomicBool,
}

unsafe impl<T> Sync for Channel<T> where T: Send {}

我们为 Channel<T> 实现了 Sync trait,而标准库中有这 2 行代码:

1
2
impl<T: ?Sized + Sync> Sync for &T {}
impl<T: ?Sized + Sync> Send for &T {}

所以 &Channel<T> 实现了 Sync/Send trait,而 Receiver 只持有了一个 &'a Channel<T>,所以它也是 Send 的!

所以 Receiver 是可以被转移到其他线程的,即下述的测试代码在编译上也是通过的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#[test]
fn cross_thread_should_work() {
let mut channel = Channel::new();
let (sender, receiver) = channel.split(); // 执行 split() 的线程
thread::scope(|s| {
s.spawn(|| {
sleep(Duration::from_millis(100));
sender.send(1);
});

// 这里在另外一个线程中,执行了 `receiver.receive()`
s.spawn(|| {
assert_eq!(receiver.receive(), 1);
});
});
}

但是我们执行后会发现,receiver.receive() 会被永久阻塞住,这是因为 sender.send(1) 只会唤醒执行 split() 的线程。

为了避免这种情况的发生,我们需要强行防止 Receiver 实现 Send trait!

怎么办呢?我们需要做到 2 件事情:

  1. Receiver 持有一个非 Sync 的属性;
  2. 这个属性除了标记没有其他作用,最好不要占用任何的资源。

这里我们介绍一位新朋友:PhantomData

PhantomData 是一个零大小类型(Zero-Sized Type, ZST),用于在编译期向类型系统传递额外信息,而不占用运行时内存。

我们可以用它在包一个 !Send 的类型,这样 Receiver 就是 !Send 的了。关于 PhantomData 的更多介绍,可以参考附录 2:PhantomData

在介绍完 PhantomData 后,我们就可以使用它来防止 Receiver 实现 Send trait 了:

1
2
3
4
pub struct Receiver<'a, T> {
channel: &'a Channel<T>,
_no_send: PhantomData<*const ()>,
}

其中 *const()!Send 的,所以我们的 Receiver 再也不会被转移到其他线程了,而 send 是要求 self,所以即便是 Sync 的也无所谓了,因为无法通过引用来执行 receive() 方法。

现在我们可以再次执行上面的测试代码(强行将 Receiver 移动到其他线程中),将会得到以下的报错:

1
2
3
4
5
6
7
8
9
10
11
12
error[E0277]: `*const ()` cannot be sent between threads safely
--> src/oneshotchannel.rs:103:21
|
103 | s.spawn(|| {
| ----- ^-
| | |
| _______________|_____within this `{closure@oneshotchannel.rs:103:21}`
| | |
| | required by a bound introduced by this call
104 | | assert_eq!(receiver.receive(), 1);
105 | | });
| |_____________^ `*const ()` cannot be sent between threads safely

到这里,通过 8 个版本,我们一步步实现了一个高性能、低内存占用且安全可用的 oneshot channel 了!

总结

至此,通过 8 个小版本,我们不仅手写了一个 高性能、安全友好的 oneshot channel,还更进一步体验了 Rust 在并发领域 “以类型系统驱动正确性” 的威力。我们来做一个简单的小结。

关键收获:

版本 新增能力 解决了什么问题
v0 Mutex + Condvar 通用通道 打开话题、对比后续无锁方案
v1 UnsafeCell<Option<T>> + AtomicBool 去锁化、最小可行一次性通道
v2 替换为 MaybeUninit<T> 节省内存&避免双状态检查
v3 运行时检查 (swap) 阻止未准备/二次调用导致 UB
v4 Drop 清理 防止“只 send 不 recv”泄漏
v5 Sender / Receiver 所有权 API 编译期保证“仅调用一次”
v6 生命周期引用替换 Arc 消除引用计数开销
v7 park / unpark 阻塞模型 使用者不再需要轮询 is_ready
v8 PhantomData 防跨线程误用 类型系统彻底封死错误用法

核心记忆点:

  • 内部可变性UnsafeCell 是所有并发原语的基石。
  • 延迟初始化MaybeUninit<T> + “就绪标志” 是零成本组合。
  • Release / Acquire:最轻量的跨线程可见性保障。
  • 所有权设计 API:让编译器替你兜底逻辑约束。
  • PhantomData:零大小但能影响 Send/Sync 的类型级标记。
  • 迭代思路:先跑通,再收口安全性与性能,最后用类型系统“防呆”。

完整的代码可以参考:conutils-oneshot

oneshot crate 浅探

附录

1. Option<T> 的内存占用是多少?

在 Rust 中,Option<T> 类型占用的内存,取决于泛型参数 T 的类型特性。具体可分为两种情况:

  • 当 T 是非指针类型(如基本类型、结构体等)时,Option<T> 需要额外的空间存储 Some 或 None 的标签,此时:
    • 内存布局:包含一个 1 字节的标签(标识 Some 或 None)和 T 类型的数据空间(可能包含对齐填充)。
    • 即使为 None,仍需保留 T 所需的内存空间(含填充),以保障枚举值大小统一。如 Option<i32> 占用 8 字节(1 字节标签 + 4 字节 i32 + 3 字节填充)。
  • 当 T 是不可为空的指针类型(如 Box<T>、&T、&mut T)时,Rust 编译器会启用空指针优化(Niche Optimization)。即利用指针不能为 0 的特性,将 None 标识为全零位模式(0x00),而 Some(ptr) 存储实际指针地址,此时无需额外标签。这个时候,NoneSome(T) 不占用任何的额外空间,大小与 T 相同。

我们可以写个程序来简单验证一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
fn test_option() {
// 1. 基本数据类型
let i: i32 = 1;
let i_none: Option<i32> = None;
let i_some: Option<i32> = Some(1);
println!("基本类型:");
println!("i32: {} bytes, ptr: {:p}", mem::size_of_val(&i), &i); // 4 bytes
println!("None<i32>: {} bytes, ptr: {:p}", mem::size_of_val(&i_none), &i_none); // 8 bytes
println!("Some<i32>: {} bytes, ptr: {:p}", mem::size_of_val(&i_some), &i_some); // 8 bytes

// 2. 自定义类型
#[repr(C)]
struct Data {
a: u64,
b: u32,
}
let data = Data { a: 1, b: 1 };
let data_none: Option<Data> = None;
let data_some: Option<Data> = Some(Data { a: 1, b: 1 });
println!("\n自定义结构体:");
println!("Data: {} bytes, ptr: {:p}", mem::size_of_val(&data), &data); // 16 bytes
println!("None<Data>: {} bytes, ptr: {:p}", mem::size_of_val(&data_none), &data_none); // 24 bytes
println!("Some<Data>: {} bytes, ptr: {:p}", mem::size_of_val(&data_some), &data_some); // 24 bytes

// 3. 指针类型
let b = Box::new(1);
let b_none: Option<Box<i32>> = None;
let b_some: Option<Box<i32>> = Some(Box::new(1));
println!("\n指针类型:");
println!("Box<i32>: {} bytes, ptr: {:p}", mem::size_of_val(&b), &b); // 8 bytes
println!("None<Box<i32>>: {} bytes, ptr: {:p}", mem::size_of_val(&b_none), &b_none); // 8 bytes
println!("Some<Box<i32>>: {} bytes, ptr: {:p}", mem::size_of_val(&b_some), &b_some); // 8 bytes
let none_value = unsafe { *(&b_none as *const _ as *const i64) };
println!("None<Box<i32>> bit pattern: {:#x}", none_value); // 0x0
let some_value = unsafe { *(&b_some as *const _ as *const i64) };
println!("None<Box<i32>> bit pattern: {:#x}", some_value); // 0x15d0043c0
}

在笔者的电脑下,输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
基本类型:
i32: 4 bytes, ptr: 0x16bef203c
None<i32>: 8 bytes, ptr: 0x16bef2040
Some<i32>: 8 bytes, ptr: 0x16bef2048

自定义结构体:
Data: 16 bytes, ptr: 0x16bef2200
None<Data>: 24 bytes, ptr: 0x16bef2210
Some<Data>: 24 bytes, ptr: 0x16bef2228

指针类型:
Box<i32>: 8 bytes, ptr: 0x16bef23f8
None<Box<i32>>: 8 bytes, ptr: 0x16bef2400
Some<Box<i32>>: 8 bytes, ptr: 0x16bef2408
None<Box<i32>> bit pattern: 0x0

通过输出我们可以观察到经过空指针优化Option<Box<i32>>NoneSome 都只占 8 字节(与 Box<i32> 相同),同时通过为 None 复用类型的无效位模式(如 0x0)消除枚举标签,实现零成本抽象。

2. PhantomData

Rust 中的 PhantomData 是一个零大小类型(Zero-Sized Type, ZST),用于在编译期向类型系统传递额外信息,而不占用运行时内存。它在泛型编程、生命周期管理和所有权标记中扮演关键角色。

它有以下的核心特性和作用:

  1. 零内存开销PhantomData<T> 本身不存储任何数据,编译后会被优化掉,因此不会增加结构体的实际内存占用

    1
    2
    3
    4
    5
    use std::marker::PhantomData;
    struct Wrapper<T> {
    data: u32,
    _marker: PhantomData<T>, // 不占空间
    }
  2. 标记未使用的泛型参数:Rust 要求泛型参数必须在结构体中被显式使用。若泛型参数未直接出现在字段中,可通过 PhantomData 标记其存在性,避免编译错误。

    1
    2
    3
    4
    struct Resource<T> {
    handle: *mut (),
    _phantom: PhantomData<T>, // 标记类型 T
    }
  3. 声明生命周期依赖:当结构体包含原始指针(如 *const T)时,PhantomData 可绑定生命周期,确保引用的数据有效性。

    1
    2
    3
    4
    5
    struct Slice<'a, T> {
    start: *const T,
    end: *const T,
    _phantom: PhantomData<&'a T>, // 绑定生命周期 'a
    }
  4. 协变与逆变控制:通过 PhantomData<&'a T>PhantomData<*mut T> 等不同形式,调整类型的协变/逆变行为。

    1
    2
    3
    4
    5
    6
    7
    use std::marker::PhantomData;
    use std::rc::Rc;

    // 标记类型为 !Send 且 !Sync
    struct NotThreadSafe {
    _marker: PhantomData<Rc<()>>, // Rc<()> 本身是 !Send + !Sync
    }