本篇我们将讨论 Go 语言底层的网络编程原理,本篇将揭示 Go 语言是如何做到同步的代码,异步的执行

特此声明,本篇是笔者基于 Go 1.25.3 版本源码、并与 Google Gemini 3Pro 共创所作,非常庆幸在当今 AI 时代下获取知识已是如此便利,且也为学习者从第一性原理理解所学知识大大降低了门槛。不过本篇的篇章安排和叙述逻辑,均由笔者把控和审阅,欢迎放心阅读。

在开启本章之前,你最好对下列知识有一点的了解:

1. 宏观概述

要从根本上理解 Go 的网络编程模型,我们需要剥离掉语法糖,回到计算机体系结构和操作系统原理的 第一性原理如何高效地处理 CPU 计算与 I/O 等待之间的速度差异?

Go 的网络模型之所以强大,是因为它在一个极其优雅的抽象层(Goroutine)下,完美隐藏了复杂的异步 I/O 细节。

接下来让我们尝试由表及里,从编程模型到内核实现,分三个层级来剖析。

2.1 第一层:编程模型 —— 同步的代码,异步的执行

在 Go 1.25 中,网络编程的依然遵循着 Go 诞生之初的哲学:Goroutine-per-connection。开发者编写的是标准的 同步阻塞式(Synchronous Blocking) 代码。

1
2
3
4
5
6
7
8
9
10
// 开发者视角:逻辑是线性的
listener, _ := net.Listen("tcp", ":8080")
for {
conn, _ := listener.Accept() // 看起来这里阻塞了,直到有新连接
go func(c net.Conn) {
buf := make([]byte, 1024)
n, _ := c.Read(buf) // 看起来这里阻塞了,直到有数据
// 处理数据...
}(conn)
}

如果按照 C 语言或早期 Java 的传统线程模型,上述代码意味着每个连接需要一个 OS 线程。但线程太重了(栈内存约 1MB - 8MB,上下文切换成本高)。但是在 Go 语言中,当你调用 c.Read 时,当前的 Goroutine 确实"暂停"了,但底层的操作系统线程(M)并没有阻塞,而是去干别的活了。这样开发者拥有了编写简单线性逻辑的权利,同时享受了非阻塞 I/O 的高性能。

2.2 第二层:系统调用层 —— 非阻塞 I/O 的伪装

为了实现上述同步阻塞的假象,Go 在底层实际上使用的是 非阻塞 I/O(Non-blocking I/O)

在 Go 1.25 的 net 包内部,当你创建一个 socket 时,Go Runtime 会通过系统调用(如 Linux 下的 socket + fcntl)显式地将该文件描述符(File Descriptor, FD)设置为 Non-blocking 模式。

当你调用 conn.Read() 时,Go 底层实际执行了以下逻辑:

  1. 直接尝试读取: 直接对 FD 发起 read 系统调用。
  2. EAGAIN 错误: 绝大多数时候,内核缓冲区是空的。因为 FD 是非阻塞的,操作系统不会让线程睡眠,而是立刻返回一个 EAGAIN(或 EWOULDBLOCK)错误,表示现在没数据,别堵在这。
  3. 捕获错误并挂起: Go 的网络库捕获到这个错误,意识到"现在读不到数据"。于是,它不会让代码报错,而是通过 Runtime 调度器将当前的 Goroutine 状态置为 Gwaiting(等待中),并将该 Goroutine 移出 CPU 执行队列。

2.3 第三层:Runtime 核心 —— Netpoller 与 GMP 的联动

这是 Go 网络模型的心脏。Go 引入了一个名为 Netpoller 的组件,它是 Go Runtime 与操作系统 I/O 多路复用机制(I/O Multiplexing)之间的桥梁。

Netpoller 并不是一个一直运行的独立线程,而是 Runtime 中的一组函数逻辑。它封装了不同操作系统的多路复用技术:

  • Linux: epoll
  • macOS/FreeBSD: kqueue
  • Windows: IOCP

在 Linux 的 epoll 中,包含 3 个核心函数:

  • 新建多路复用器:epoll_create()
  • 插入监听事件:epoll_ctl()
  • 查询发生了什么事件:epoll_wait()

Go 的 Netpoller 提供了对各个平台多路复用器的抽象和适配:

  • netpollinit -> epoll_create
  • netpollopen -> epoll_ctl
  • netpoll -> epoll_wait

让我们回到刚才 conn.Read() 返回 EAGAIN 的时刻:

  1. 注册(Register): 当前运行的 Goroutine (G) 在被挂起前,会将自己的 FD 和期望的事件(如可读)注册到 Netpoller 中。本质上是调用了 epoll_ctl 将 FD 加入监听列表。
  2. 让出(Park): G 停止运行,M(系统线程)现在空闲了。M 会根据 GMP 调度模型,从 P(处理器)的本地队列中抓取下一个可运行的 G 去执行。
  3. 监控(Poll): 什么时候唤醒原来的 G?
    • 被动触发: 当系统监控线程 sysmon 运行,或者调度器发现没有 G 可运行时,会调用 runtime.netpoll
    • 底层机制: runtime.netpoll 内部调用 epoll_wait,询问操作系统我关注的那些 FD 有哪些数据到了。
  4. 唤醒(Ready): 操作系统返回就绪的 FD 列表。Netpoller 根据 FD 找到当初阻塞在上面的 Goroutine,将其状态改为 Grunnable(可运行),并将其注入到当前 P 的本地队列或全局队列中。
  5. 执行: 在下一轮调度中,原来的 G 被 M 拿到,继续执行 conn.Read() 后面的代码。

2.4 小节

如果用文字总结这套机制的精髓,可以概括为:用户态的阻塞,内核态的非阻塞;线性的逻辑,事件驱动的内核。

完整的数据流向图解:

  1. User: conn.Read(buf)
  2. Go Runtime (Poll): syscall.Read(fd) -> 返回 EAGAIN
  3. Go Scheduler:
    • 调用 netpollOpen (注册 epoll)
    • 调用 gopark (挂起当前 G,状态 -> Gwaiting)
    • 线程 M 切换去执行其他 G
  4. --- 时间流逝,网络包到达网卡 ---
  5. OS Kernel: 中断处理,数据拷贝到内核缓冲区,FD 变为 Readable。
  6. Go Runtime (Monitor/Schedule):
    • sysmon 或 调度器执行 netpoll (epoll_wait)
    • 发现 FD 就绪
    • 调用 goready (找到对应的 G,状态 -> Grunnable)
  7. Go Scheduler: G 被放入队列,最终被 M 执行。
  8. User: conn.Read 从挂起处恢复,再次执行 syscall.Read,成功读取数据。
sequenceDiagram
    autonumber
    participant G as User Goroutine (G)
    participant NP as Netpoller (Internal)
    participant Sched as Go Scheduler (M/P)
    participant OS as OS Kernel (epoll/IO)

    Note over G, Sched: 阶段一:发起读请求 (User Space)

    G->>NP: 1. conn.Read(buf)
    activate G
    activate NP

    NP->>OS: 2. syscall.Read(fd) (非阻塞)
    OS-->>NP: 3. 返回 EAGAIN (无数据)

    Note right of NP: 判定需要挂起

    NP->>OS: 4. netpollOpen / epoll_ctl
(注册 FD 到 epoll 实例) NP->>Sched: 5. gopark (请求挂起 G) deactivate NP deactivate G activate Sched Note over G: 状态: Grunning -> Gwaiting Note over Sched: 6. 线程 M 解绑当前 G
M 切换去执行其他 G deactivate Sched Note over G, OS: 阶段二:异步等待 (Kernel Space) G-x G: (Goroutine 暂停,不消耗 CPU) Note over OS: ... 时间流逝 ... Note over OS: 7. 网络包到达 -> 中断处理
数据拷入内核缓冲区 -> FD Readable Note over G, OS: 阶段三:唤醒与执行 (Runtime Monitor) loop Sysmon 或 调度器检查 Sched->>OS: 8. netpoll (epoll_wait) OS-->>Sched: 9. 返回就绪 FD 列表 end activate Sched Sched->>Sched: 根据 FD 找到对应的 G Sched->>Sched: 10. goready(G) Note over G: 状态: Gwaiting -> Grunnable Sched-->>G: 11. G 被放入本地/全局队列
最终被 M 捕获并执行 deactivate Sched activate G Note over G: 从 gopark 处恢复代码执行 G->>NP: 12. 再次调用 internal read activate NP NP->>OS: 13. syscall.Read(fd) OS-->>NP: 14. 返回实际数据 (Data) NP-->>G: 15. 返回 n, err deactivate NP deactivate G

2. 源码剖析

在对 Go 的网络编程模型有了一定的宏观了解后,本篇我们将深入底层源码来剖析 Go Runtime 是如何实现上面这些能力的。

2.1 Go 的系统调用的封装

在 Go1.16 左右的版本(笔者之前研究的是 Go.16 版本,对其他版本可能不太熟悉),Go 对 epoll_createepoll_ctl 等系统调用,每个都有单独的汇编实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// int32 runtime·epollcreate(int32 size);
TEXT runtime·epollcreate(SB),NOSPLIT,$0
MOVL size+0(FP), DI
MOVL $SYS_epoll_create, AX
SYSCALL
MOVL AX, ret+8(FP)
RET

// func epollctl(epfd, op, fd int32, ev *epollEvent) int
TEXT runtime·epollctl(SB),NOSPLIT,$0
MOVL epfd+0(FP), DI
MOVL op+4(FP), SI
MOVL fd+8(FP), DX
MOVQ ev+16(FP), R10
MOVL $SYS_epoll_ctl, AX
SYSCALL
MOVL AX, ret+24(FP)
RET

但是当最近笔者在阅读 Go 1.25 版本的源码时,发现 Go 已经统一了系统调用的入口了,如 linux amd64 平台上,Go 将系统调用统一封装在 internal/runtime/syscall/asm_linux_amd64.s,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// func Syscall6(num, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2, errno uintptr)
TEXT ·Syscall6<ABIInternal>(SB),NOSPLIT,$0
// a6 already in R9.
// a5 already in R8.
MOVQ SI, R10 // a4
MOVQ DI, DX // a3
MOVQ CX, SI // a2
MOVQ BX, DI // a1
// num already in AX.
SYSCALL
CMPQ AX, $0xfffffffffffff001
JLS ok
NEGQ AX
MOVQ AX, CX // errno
MOVQ $-1, AX // r1
MOVQ $0, BX // r2
RET
ok:
// r1 already in AX.
MOVQ DX, BX // r2
MOVQ $0, CX // errno
RET

我们不用太纠结它的具体实现,通过注释,我们可以知道这段汇编对应的就是 Go 里面的 Syscall6,具体位于 runtime/syscall/syscall_linux.go#L17:

1
2
// Syscall6 calls system call number 'num' with arguments a1-6.
func Syscall6(num, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2, errno uintptr)

它的具体运用在 syscall/syscall_linux.go#L95

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//go:uintptrkeepalive
//go:nosplit
//go:linkname Syscall6
func Syscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2 uintptr, err Errno) {
runtime_entersyscall()
r1, r2, err = RawSyscall6(trap, a1, a2, a3, a4, a5, a6)
runtime_exitsyscall()
return
}

//go:uintptrkeepalive
//go:nosplit
//go:norace
//go:linkname RawSyscall6
func RawSyscall6(trap, a1, a2, a3, a4, a5, a6 uintptr) (r1, r2 uintptr, err Errno) {
var errno uintptr
r1, r2, errno = runtimesyscall.Syscall6(trap, a1, a2, a3, a4, a5, a6)
err = Errno(errno)
return
}

这个时候 epollo_createepollo_waitepollo_ctl 就很好实现了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func EpollCreate1(flags int32) (fd int32, errno uintptr) {
r1, _, e := Syscall6(SYS_EPOLL_CREATE1, uintptr(flags), 0, 0, 0, 0, 0)
return int32(r1), e
}

var _zero uintptr

func EpollWait(epfd int32, events []EpollEvent, maxev, waitms int32) (n int32, errno uintptr) {
var ev unsafe.Pointer
if len(events) > 0 {
ev = unsafe.Pointer(&events[0])
} else {
ev = unsafe.Pointer(&_zero)
}
r1, _, e := Syscall6(SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(ev), uintptr(maxev), uintptr(waitms), 0, 0)
return int32(r1), e
}

func EpollCtl(epfd, op, fd int32, event *EpollEvent) (errno uintptr) {
_, _, e := Syscall6(SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0)
return e
}

2.2 Go 对 Epoll 的抽象 - network poller

本文仅介绍针对 Linux AMD64 的实现。

Go NetWork Poll 是对各个平台多路复用器的抽象和适配:

  • netpollinit -> epoll_create
  • netpollopen -> epoll_ctl
  • netpoll -> epoll_wait

2.1.1 netpollinit -> epoll_create

系统指令:internal/runtime/syscall/defs_linux_amd64.go

1
SYS_EPOLL_CREATE1 = 291

Go 中的声明:EpolloCreate1

1
2
3
4
5
6
7
8
9
10
func EpollCreate1(flags int32) (fd int32, errno uintptr) {
r1, _, e := Syscall6(SYS_EPOLL_CREATE1, uintptr(flags), 0, 0, 0, 0, 0)
return int32(r1), e
}

func netpollinit() {
var errno uintptr
epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
// ...
}
  • _EPOLL_CLOEXEC:创建的 epfd 会设置 FD_CLOEXEC,它是一个 fd 的标识说明,用来设置文件的 close-on-exec 状态的。当 close-on-exec 状态为 0 时,调用 exec 时,fd 不会被关闭;非零状态时则会被关闭,这样做可以防止 fd 泄露给执行 exec 后的进程。

针对 Linux 的实现:runtime/netpoll_epoll.go

  1. 创建 epoll 实例:创建 Linux 的 I/O 多路复用器,用于同时监控成千上万个网络连接。
  2. 创建 eventfd:创建一个特殊的文件描述符,用于唤醒阻塞线程。
  3. 将 eventfd 注册到 epoll:这样既能等网络事件,也能被主动唤醒。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 新建多路复用器,这个函数在 Go 程序启动时被调用一次,用于初始化 Linux 平台的网络轮询器(netpoller)
func netpollinit() {
var errno uintptr
// 1. 创建一个 epoll 实例,返回的文件描述符存储在全局变量 `epfd` 中
// `EPOLL_CLOEXEC` 标志确保在 `exec` 时自动关闭这个 fd
epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
if errno != 0 {
println("runtime: epollcreate failed with", errno)
throw("runtime: netpollinit failed")
}
// 2. 创建一个 eventfd,这是 Linux 的一种特殊文件描述符
// 设置为非阻塞模式(EFD_NONBLOCK)和 exec 时关闭(EFD_CLOEXEC)
// eventfd 用于唤醒阻塞在 `epoll_wait` 上的线程。这是 Go netpoller 的关键机制!
efd, errno := syscall.Eventfd(0, syscall.EFD_CLOEXEC|syscall.EFD_NONBLOCK)
if errno != 0 {
println("runtime: eventfd failed with", -errno)
throw("runtime: eventfd failed")
}
// 3. 构造 epollo 事件结构,syscall.EPOLLIN 表示监听可读事件
ev := syscall.EpollEvent{
Events: syscall.EPOLLIN,
}
// 4. 将 netpollEventFd 的地址存储到 ev.Data 中
// 当 epoll 返回事件时,我们需要知道是哪个 fd 触发的事件。
// 通过 Data 字段,我们可以区分:
// - 是 eventfd 触发的(唤醒信号)
// - 还是某个网络连接 fd 触发的(真正的网络 I/O)
*(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollEventFd
// 5. 使用 `EPOLL_CTL_ADD` 操作将 eventfd 添加到 epoll 实例中
// 当 eventfd 变为可读时,epoll_wait 会返回
errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, efd, &ev)
if errno != 0 {
println("runtime: epollctl failed with", errno)
throw("runtime: epollctl failed")
}
// 6. 将 eventfd 保存到全局变量中
// 后续 `netpollBreak()` 函数会使用这个 fd 来唤醒阻塞的 epoll_wait
netpollEventFd = uintptr(efd)
}

2.1.2 netpollopen -> epoll_ctl

系统指令:internal/runtime/syscall/defs_linux_amd64.go

1
SYS_EPOLL_CTL     = 233

Go 中的声明:EpolloCtl

1
2
3
4
func EpollCtl(epfd, op, fd int32, event *EpollEvent) (errno uintptr) {
_, _, e := Syscall6(SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0)
return e
}
  • epfd:epoll_create 函数返回的文件描述符,用于标识内核中的 epoll 实例
  • op:对 fd 文件描述符的操作类型:
    • EPOLL_CTL_ADD:向 interest list 添加一个需要监视的描述符
    • EPOLL_CTL_DEL:向 interest list 删除一个描述符
    • EPOLL_CTL_MOD:修改 interst list 中的一个描述符
  • fd:需要被操作的文件描述符
  • event:一个指向名为 epoll_event 的结构的指针,它存储了我们实际要监视的 fd 的事件
    • EPOLLIN:表示对应的文件描述符可以读。
    • EPOLLOUT:表示对应的文件描述符可以写。
    • EPOLLERR:表示对应的文件描述符发生错误。
    • EPOLLHUP:表示对应的文件描述符被挂断。
    • EPOLLRDHUP:表示对端关闭连接或半关闭写端。
    • EPOLLET: 将 epoll 设为边缘触发(Edge Triggered)模式,相对于水平触发(Level Triggered)来说的。

针对 Linux 的实现:runtime/netpoll_epoll.go

  1. 传入一个 socket 的 fd,和 pollDesc 指针,pollDesc 是 Go 中对 socket 的抽象。pollDesc 中记录了 socket 的详细信息,以及哪个协程休眠在等待此 socket;
  2. 将 socket 的可读、可写、断开事件注册到 epoll 中;
  3. 将 epoll 设置为 ET 模式。
1
2
3
4
5
6
7
8
9
// 将 fd 的四个事件 syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET 注册到 epfd 上
// 开始监控其 I/O 事件
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
var ev syscall.EpollEvent
ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
*(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp
return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}

2.1.3 netpoll -> epoll_wait

系统指令:internal/runtime/syscall/defs_linux_amd64.go

1
SYS_EPOLL_PWAIT   = 281

Go 中的声明:EpolloWait

1
2
3
4
5
6
7
8
9
10
func EpollWait(epfd int32, events []EpollEvent, maxev, waitms int32) (n int32, errno uintptr) {
var ev unsafe.Pointer
if len(events) > 0 {
ev = unsafe.Pointer(&events[0])
} else {
ev = unsafe.Pointer(&_zero)
}
r1, _, e := Syscall6(SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(ev), uintptr(maxev), uintptr(waitms), 0, 0)
return int32(r1), e
}
  • epfd:epoll_create 函数返回的文件描述符,用于标识内核中的 epoll 实例。
  • ev 已经分配好的 epoll_event 结构体数组,epoll 会把发生的事件存入 events 中。
  • maxev:告诉内核最多返回的事件数量有多大,必须大于 0。
  • waitms:超时时间,-1 表示 epoll 将无限制等待下去。

针对 Linux 的实现:runtime/netpoll_epoll.go

  1. 根据 delay 确定要轮询多久;
  2. 创建一个长度为 128 的事件列表;
  3. 调用系统底层的 epollwait,查询有多少事件发生了;
  4. 新建一个协程列表;
  5. 遍历事件列表;
  6. 获取 go 中对 fd 的抽象结构体的值 pd;
  7. 将 pd 中的 g 取出来加入到 toRun 列表中;
  8. 返回可执行的 goroutine 列表。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// 注意返回的是一个可执行的 Goroutine 列表
func netpoll(delay int64) (gList, int32) {
// 1. 计算超时时间
var waitms int32
if delay < 0 { // 无限等待,阻塞直到有事件
waitms = -1
} else if delay == 0 { // 非阻塞,立即返回
waitms = 0
} else if delay < 1e6 {
waitms = 1 // 小于 1 微秒的延迟,至少等待 1 毫秒,毫秒是最小粒度,0 会变成非阻塞
} else if delay < 1e15 {
waitms = int32(delay / 1e6) // 正常范围:转换纳秒到毫秒 (1ms = 1e6 ns)
} else {
waitms = 1e9 // 超大延迟,限制为约 11.5 天
}

// 2. 准备接收最多 128 个就绪事件
var events [128]syscall.EpollEvent

retry:
// 3. 调用 epoll_wait 等待事件
n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
if errno != 0 {
if errno != _EINTR {
println("runtime: epollwait on fd", epfd, "failed with", errno)
throw("runtime: netpoll failed")
}
if waitms > 0 {
return gList{}, 0
}
goto retry
}

// 4. 初始化返回值
var toRun gList // 就绪的 goroutine 列表
delta := int32(0) // netpollWaiters 的调整值

// 5.处理所有返回的事件
for i := int32(0); i < n; i++ {
ev := events[i]
if ev.Events == 0 {
continue
}

// 6. 判断是否是 eventfd 的唤醒信号,eventfd 用于从外部唤醒 epoll_wait
if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollEventFd {
// eventfd 应该只产生 EPOLLIN 事件
if ev.Events != syscall.EPOLLIN {
println("runtime: netpoll: eventfd ready for", ev.Events)
throw("runtime: netpoll: eventfd ready for something unexpected")
}

// 消费唤醒信号
if delay != 0 {
var one uint64
read(int32(netpollEventFd), noescape(unsafe.Pointer(&one)), int32(unsafe.Sizeof(one)))
// 清除唤醒标志,允许下次 netpollBreak
netpollWakeSig.Store(0)
}
// 跳过 eventfd,继续处理其他事件,eventfd 只是唤醒机制,不对应真实的网络 I/O
continue
}

// 7.根据触发的事件设置读写模式
var mode int32

// [可读事件] 检查各种可读条件
if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
// EPOLLIN: 有数据可读
// EPOLLRDHUP: 对端关闭写端(半关闭)
// EPOLLHUP: 连接挂断
// EPOLLERR: 发生错误
mode += 'r'
}

// [可写事件] 检查各种可写条件
if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
// EPOLLOUT: 可以写入数据
// EPOLLHUP: 连接挂断
// EPOLLERR: 发生错误
mode += 'w'
}
// 注意:mode 可能是 'r'(114), 'w'(119), 或 'r'+'w'(233)

if mode != 0 {
// 8. 获取 netpoller 对 socket 的抽象实例 pollDesc
tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))
pd := (*pollDesc)(tp.pointer())
tag := tp.tag() // 提取序列号标签

// 9. 检查是否是过期事件
if pd.fdseq.Load() == tag {
// 序列号匹配,这是有效的事件
// 原因:防止 ABA 问题(fd 被关闭后重新打开复用)
pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)

// 10. 将就绪的 goroutine 加入运行队列
delta += netpollready(&toRun, pd, mode)
}
// else: 序列号不匹配,忽略过期事件,说明这个 pollDesc 已经被新的连接复用了
}
}

// 11. 返回就绪的 goroutine 列表和等待计数调整值
return toRun, delta
}

netpollready() 表示 pd 底层的 fd 已经可以进行 I/O 操作了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
delta := int32(0)
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true, &delta)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true, &delta)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
return delta
}

2.1.4 谁在调用 netpoll()?

2.1.4.1 垃圾回收循环

runtime/proc.go 中的 startTheWorldWithSema() 会调用 netpoll()

1
2
3
4
5
6
7
8
func startTheWorldWithSema(emitTraceEvent bool) int64 {
...
if netpollinited() {
list := netpoll(0) //调用 netpoll
injectglist(&list)
}
...
}

runtime/mgc.go 中的 gcStart() 会调用 startTheWorldWithSema(),而 gcStart() 又会被我们的 g0 协程一直循环执行。

1
2
3
4
5
6
7
8
9
10
// gcStart starts the GC.
func gcStart(trigger gcTrigger) {
...
// Concurrent mark.
systemstack(func() {
now = startTheWorldWithSema(trace.enabled) // 调用 startTheWorldWithSema
...
})
...
}

而且 g0 协程在循环 gc 的时候,顺带执行了 netpoll() 来检查是否有事件发生。

2.1.4.2 协程调度

深入浅出 Go 语言的 GPM 模型(Go1.21) 中,我们提到了 Go 协程调度最核心的函数 schedule()

1
2
3
4
5
func schedule() {
// ..
gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available
// ...
}

协程调度的时候会去执行 findRunnable() 寻找可以运行的 Goroutine,这里面也会调用 netpoll() 检查是否有网络事件发生。

1
2
3
4
5
6
7
8
9
10
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
// ..
// Poll network until next timer.
if netpollinited() && (netpollAnyWaiters() || pollUntil != 0) && sched.lastpoll.Swap(0) != 0 {
//..
list, delta := netpoll(delay) // block until new work is available
// ...
}
// ..
}

2.2 network poll 对 socket 的抽象 —— pollDesc

Go 的 netpoller 需要进一步对 socket 进行抽象,是为了解决 2 个核心问题:

  1. 状态同步问题:如何让 Go 调度器(用户态)和操作系统内核(内核态)共享同一个 socket 的状态(是读还是写?是谁在等?)。
  2. 生命周期错位问题:操作系统内核的通知是异步的,可能在 Go 已经关闭或复用了文件描述符(FD)之后,内核才发来一个旧的就绪通知。这会导致严重的内存腐坏或逻辑错误。

为此,Go 定义了两个数据结构:pollDescpollCache。我们将 pollDesc 看作"桥梁",将 pollCache 看作"安全区"

2.2.1 pollDesc

pollDesc 是 Go 运行时为每个网络文件描述符(socket)创建的轮询描述符对象,用于管理该 fd 的异步 I/O 状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Network poller descriptor.
//
// No heap pointers.
type pollDesc struct {
_ sys.NotInHeap
link *pollDesc // in pollcache, protected by pollcache.lock
fd uintptr // constant for pollDesc usage lifetime
fdseq atomic.Uintptr // protects against stale pollDesc

// atomicInfo holds bits from closing, rd, and wd,
// which are only ever written while holding the lock,
// summarized for use by netpollcheckerr,
// which cannot acquire the lock.
// After writing these fields under lock in a way that
// might change the summary, code must call publishInfo
// before releasing the lock.
// Code that changes fields and then calls netpollunblock
// (while still holding the lock) must call publishInfo
// before calling netpollunblock, because publishInfo is what
// stops netpollblock from blocking anew
// (by changing the result of netpollcheckerr).
// atomicInfo also holds the eventErr bit,
// recording whether a poll event on the fd got an error;
// atomicInfo is the only source of truth for that bit.
atomicInfo atomic.Uint32 // atomic pollInfo

// rg, wg are accessed atomically and hold g pointers.
// (Using atomic.Uintptr here is similar to using guintptr elsewhere.)
rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil

lock mutex // protects the following fields
closing bool
rrun bool // whether rt is running
wrun bool // whether wt is running
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rt timer // read deadline timer
rd int64 // read deadline (a nanotime in the future, -1 when expired)
wseq uintptr // protects from stale write timers
wt timer // write deadline timer
wd int64 // write deadline (a nanotime in the future, -1 when expired)
self *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}

核心字段解析:

  1. fd:这是最原始的操作系统文件描述符(例如 Linux 上的 int 类型的 5, 6 等)。它是连接到 epoll / kqueue 的物理句柄。

  2. rg (Read Group) / wg (Write Group)这是最重要的字段。 它们实现了无锁(Lock-free)的状态流转。它们不仅仅存储 Goroutine 的指针(*g),还是一个多状态的原子变量:

    • 0 (pdNil): 没有任何 Goroutine 在等待。
    • 1 (pdReady): I/O 已经就绪(网卡有数据了),不需要等待,直接读。
    • 2 (pdWait): 正在准备挂起,作为中间状态。
    • > 2 (G Pointer): 存储了正在阻塞等待的 Goroutine 的内存地址。

    epoll_wait 返回就绪事件时,Netpoller 会通过 rgwg 里的地址找到那个 G,然后调用 goready(G) 唤醒它。

  3. 超时管理(rtwtrdwd):管理读写操作的 deadline。Go 的 SetReadDeadlineSetWriteDeadline 就是在这里实现的。每个网络连接自带两个定时器。如果超时触发,定时器回调会强制将 rgwg 状态置为错误,并唤醒 G。G 醒来后发现是超时导致的唤醒,于是返回 timeout error

  4. 防止过时通知(fdseqrseqwseq):通过序列号防止在 fd 复用后收到旧的就绪通知。

  5. link:指向下一个空闲的 pollDesc,后面会详细分析。

2.2.1 pollCache

网络程序中会频繁地打开和关闭连接,每个连接都需要一个 pollDesc。如果每次都分配新对象并最终让 GC 回收,会带来巨大的性能开销。pollCache 通过对象池模式复用 pollDesc,大幅提升性能。用一句话概述就是:pollCache 是一个专门用于分配 pollDesc 的链表式缓存池。

1
2
3
4
5
6
7
8
9
type pollCache struct {
lock mutex // 锁
first *pollDesc // 指向 pollDesc 链表的第一个节点,即下一个可用的空闲节点(头插法)
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
}

相信不少读者都会注意到注释中的这句话:

1
// PollDesc objects must be type-stable,

为什么呢?想象下面这样一个流程:

  1. 你打开了一个 Socket,FD 为 10。
  2. Go 将 FD 10 注册给 epoll,由于内核并没有给我们回调函数,epoll 内部通常存储的是 pollDesc内存地址作为 user_data
  3. 你关闭了连接。Go 回收了 FD 10,也释放了 pollDesc 的内存。
  4. 危险时刻:假设这块内存立刻被 Go 的 GC 分配给了一个 string 或者是其他对象。
  5. 延迟通知:此时,内核里积压的一个关于 FD 10 的"可读"事件突然触发了(或者是一个极端的竞态条件)。epoll 返回了那个旧的 pollDesc 内存地址,告诉 Runtime 这里"可读"。
  6. 崩溃:Runtime 以为这还是个 pollDesc,试图去修改它的 rg 字段。但这块内存现在存的是一个字符串!结果:内存腐坏(Memory Corruption),程序直接崩溃且极难调试。

解决方案:Type-Stable Memory(类型稳定内存)

pollCache 保证了通过它分配出去的内存块,即使被释放回收了,也永远只能作为 pollDesc 存在,绝不会被 GC 挪作他用。

  • sys.NotInHeap: 标记这个结构体不在普通的 GC 堆上管理,而是手动管理的(pollDesc 的第一个字段)。
  • 链表管理:
    • lock: 保护链表。
    • first: 指向链表头部的空闲 pollDesc
    • 分配: 从 first 取一个。如果链表空,向 OS 申请一大块内存(4KB),切分成多个 pollDesc 串到链表上。
    • 释放: 并不是真的 free 掉内存,而是把它放回 first 链表头,留给下一个连接复用。

这就保证了:即使内核发来一个过期的通知,Runtime 访问的那个内存地址依然是一个合法的 pollDesc 结构体(虽然它可能不再关联任何活跃连接),最多就是读到一个无效状态,而不会导致内存越界或类型错误。

2.2.1.1 分配 alloc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (c *pollCache) alloc() *pollDesc {
lock(&c.lock)
// 1. 未初始化,则先进行初始化,一次性分配 n 个 pollDesc
if c.first == nil {
type pollDescPadded struct {
pollDesc
pad [tagAlign - unsafe.Sizeof(pollDesc{})]byte
}
const pdSize = unsafe.Sizeof(pollDescPadded{})
n := pollBlockSize / pdSize
if n == 0 {
n = 1
}
// Must be in non-GC memory because can be referenced
// only from epoll/kqueue internals.
mem := persistentalloc(n*pdSize, tagAlign, &memstats.other_sys)
for i := uintptr(0); i < n; i++ {
pd := (*pollDesc)(add(mem, i*pdSize))
lockInit(&pd.lock, lockRankPollDesc)
pd.rt.init(nil, nil)
pd.wt.init(nil, nil)
pd.link = c.first
c.first = pd
}
}
// 2. 取出链表头部的空闲节点
pd := c.first
// 3. 移动到下一个空闲节点
c.first = pd.link
unlock(&c.lock)
return pd
}

2.2.1.2 回收 free

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (c *pollCache) free(pd *pollDesc) {
// pd can't be shared here, but lock anyhow because
// that's what publishInfo documents.
lock(&pd.lock)

// 1. 自增 fdseq,避免处理过期事件造成生命周期错位问题
fdseq := pd.fdseq.Load()
fdseq = (fdseq + 1) & (1<<tagBits - 1)
pd.fdseq.Store(fdseq)

// 2. 重置 pollDesc
pd.publishInfo()

unlock(&pd.lock)

lock(&c.lock)

// 3. 放回链表头部
pd.link = c.first
c.first = pd
unlock(&c.lock)
}

即便内存类型安全了,我们还面临逻辑上的 ABA 问题

  1. Goroutine A 使用 FD 10 (pollDesc 地址 0x123)。
  2. A 关闭连接,释放 FD 10,释放 pollDesc (0x123 返回缓存池)。
  3. Goroutine B 建立新连接,刚好系统又分配了 FD 10,且 pollCache 又把 0x123 分配给了 B。
  4. 此时,内核发来了 A 时代的 FD 10 的就绪事件。
  5. Runtime 拿着 0x123,以为是 B 的数据来了,错误地唤醒了 B(或者处理了错误的数据)。

fdseq 的作用: 每次 pollDesc 被复用(从缓存池拿出来)时,fdseq 都会自增。

  • 当注册 epoll 时,Go 会把当前的 fdseq 记录在某个地方(或者在检查时比对)。
  • 当事件回来时,Runtime 会检查:Event.seq == pollDesc.seq?
  • 如果不相等,说明是个过期事件,直接忽略,不进行唤醒操作。

2.2.3 总结

  1. pollDesc (State): 使用 atomic.Uintptr 存储 Goroutine 指针,实现了用户态 G 与内核态 I/O 事件的高效无锁传递
  2. pollCache (Memory): 使用类型稳定内存(Type-Stable Memory),从物理内存布局的层面消灭了异步 I/O 可能导致的内存腐坏风险。
  3. fdseq (Logic): 使用版本号机制,解决了资源复用带来的逻辑混淆(ABA 问题)。

这就是为什么 Go 的网络库在高并发、高动态(大量连接建立和断开)场景下,依然稳如磐石的底层原因。

2.3 network poller 工作细节

2.3.1 初始化 poll_runtime_pollServerInit

通过原子操作 & 双重检查来执行一次 netpollinit(),创建一个 epoll。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
netpollGenericInit()
}

func netpollGenericInit() {
// 类似于 双重检查 的单例模式
// 保证只执行一次 netpollinit()
if netpollInited.Load() == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lockInit(&pollcache.lock, lockRankPollCache)
lock(&netpollInitLock)
if netpollInited.Load() == 0 {
netpollinit() // epoll_create() 创建一个多路复用器
netpollInited.Store(1)
}
unlock(&netpollInitLock)
}
}

补充:go:linkname

补充:go:linkname

The //go:linkname directive instructs the compiler to use“importpath.name” as the object file symbol name for the variable orfunction declared as “localname” in the source code. Because thisdirective can subvert the type system and package modularity, it is onlyenabled in files that have imported “unsafe”.

//go:linkname的目的是告诉编译器使用importpath.name来对本来不可导出的(localname)函数或者变量实现导出功能。由于这种方法是破坏了Go语言的模块化规则的,所以必须在导入了"unsafe"包的情况下使用。

即:

由于 Go语法规则限制,小写字母开头的函数或者变量是本模块私有的,不可被包外的代码访问;但是如果必须要能被外部模块访问到,又要限制为私有方法呢?只能在编译器上做手脚,通过一个特殊的标记 来实现这种功能。

具体到上面的例子:

1
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
  • 表示调用 internal/poll.runtime_pollServerInit相当于调用当前的 poll_runtime_pollServerInit

2.3.2 新增监听 poll_runtime_pollOpen

  1. 在 pollcache 链表中分配一个 pollDesc,用来描述要新增将它的 socket;
  2. 初始化 pollDesc,主要是将 rg、wg 置为 0;
  3. 调用 netpollopen,将底层 socket 及其读、写和断开事件注册到 epoll 上;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
// 1. 分配一个 pollDesc,用来描述要新增监听的 socket
pd := pollcache.alloc()
// 2. 上锁
lock(&pd.lock)
wg := pd.wg.Load()
if wg != pdNil && wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
rg := pd.rg.Load()
if rg != pdNil && rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
// 3. 赋值
pd.fd = fd
if pd.fdseq.Load() == 0 {
// The value 0 is special in setEventErr, so don't use it.
pd.fdseq.Store(1)
}
pd.closing = false
pd.setEventErr(false, 0)
pd.rseq++
pd.rg.Store(pdNil) // 初始值,还没感兴趣的 Goroutine
pd.rd = 0
pd.wseq++
pd.wg.Store(pdNil) // 初始化,还没感兴趣的 Goroutine
pd.wd = 0
pd.self = pd
pd.publishInfo()
// 4. 解锁
unlock(&pd.lock)

// 5. 调用 netpollopen -> epoll_ctl
// 将 pd 关联的 fd 的相关事件注册到 epoll 上
errno := netpollopen(fd, pd)
if errno != 0 {
pollcache.free(pd)
return nil, int(errno)
}
return pd, 0
}

2.3.3 判断是否就绪 poll_runtime_pollWait

  1. 协程要对 socket 进行 read 或者 write 的时候,底层就会调用 poll_runtime_pollWait;
  2. 该方法循环调用 netpollblock(),直到 netpollblock() 返回 true,表明 rg 或 wg 已经置为 pdReady 了,可以进行读或者写了。
  3. netpollblock():
    1. 根据 mode,取出 rg 或者 wg,命名为 gpp;
    2. 如果 gpp 是 pdReady,直接返回 true,否则,置为 pdWait,返回 false。
1
2
3
4
5
6
7
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
1
func runtime_pollWait(ctx uintptr, mode int) int
1
2
3
4
5
6
7
8
9
10
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
...
// 循环调用 netpollblock,直到 netpollblock 返回 true
// 也就是 rg 或 wg 已经置为 pdReady 了,可以读 / 写了
for !netpollblock(pd, int32(mode), false) {
...
}
return pollNoError
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// returns true if IO is ready, or false if timed out or closed
// waitio - wait only for completed IO, ignore errors
// Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc
// can hold only a single waiting goroutine for each mode.
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// 1. 根据 mode,看看是要读还是要写
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

for {
// 2. 已经 pdReady 了,返回 true,完成
if gpp.CompareAndSwap(pdReady, pdNil) {
return true
}

// 3. 没有 pdReady,则先置为 pdWait,再往下走
if gpp.CompareAndSwap(pdNil, pdWait) {
break
}
}

// 4. 调用 gopark 阻塞当前 goroutine
if waitio || netpollcheckerr(pd, mode) == pollNoError {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5)
}
// 5. 当 gopark 返回时,表示被唤醒,重置为 pdNil
old := gpp.Swap(pdNil)

// 6. 如果是 pdReady,则返回 true,否则可能是超时等原因,返回 false
return old == pdReady
}
1
2
3
4
5
6
7
8
9
10
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
netpollAdjustWaiters(1)
}
return r
}

2.3.4 调度协程去读写 socket

  • socket 已经可以读写:

    1. runtime 循环调用 netpoll() 方法;

      前面分析过了,是 g0 协程在 gc 的时候顺便调用了 netpoll。

    2. 发现 socket 可读写时,给对应的 rg 或 wg 置为 pdReady(1);

    3. 协程调用 poll_runtime_pollWait() 判断 socket 是否就绪;判断 rg 或者 wg 已经置为 pdReady(1),那就返回 0;

    4. runtime 就知道 socket 可以操作了。

  • socket 暂时不可读写:

    1. runtime 循环调用 netpoll() 方法;
    2. netpoll 中没有监听到任何事件,执行不到 netpollready,没有对 pd 做任何改变;
    3. 协程调用 poll_runtime_pollWait() 判断 socket 是否就绪:
      1. 判断 rg 或 wg 还是 pdNil(0),就将 rg 或者 wg 置为 pdWait(2)
      2. 调用 gopark 将协程进行休眠等待;
      3. 然后再进入 netpollblockcommit 将 rg 或者 wg 置为 G pointer
    4. 假如 runtime 后面再循环调用 netpoll() 方法;
    5. 发现 socket 可读写时,进入 netpollready 再检查对应的 rg 或者 wg;
    6. netpollready 再进入 netpollunblock,它会检查 rg 或者 wg;
    7. 若为 G pointer,那么就将 rg 或者 wg 置为 pdReady,然后返回协程地址给 runtime;
    8. runtime 就会去调度对应协程进行 socket 的读写操作。
  • 读写后都会再将 rg 或者 wg 置为 nil

2.3.5 总结

Go 的网络操作底层为 阻塞模型(协程调度) + 多路复用(系统底层),具体情况为:

  • BIO:go 协程从网络读取数据,读取失败并且返回syscall.EAGAIN 时,依次调用 waitRead->runtime_pollWait->poll_runtime_pollWait->netpollblock->gopark 将当前协程挂起。
  • NIO:runtime 的 g0 协程在 gc 的时候会顺便调用 netpoll() 检查 socket 事件是否发生,当 socket 可操作的时候,重新唤醒对应协程,进行调度。

具体细节为:

  • runtime
    1. runtime 会一直循环去检查 socket 的可读写状态 —— netpoll()
    2. 然后再看是否有协程在等待对应的 socket:—— netpollready()
      1. 没有,那就单纯记录 pollDesc;
      2. 有那就唤醒协程,将 g 加入 toRun 列表,进行调度 —— netpollunblock()
  • goroutine
    1. 表明想要操作 socket —— poll_runtime_pollWait(pd,mode)
    2. 循环检查自己关心的 socket 是否可操作 —— netpollblock()
      1. 可以操作,goroutine 就会对 socket 进行读或写操作了;
      2. 不可操作:
        1. 就将自己休眠 —— gopark()
        2. 将 rg 或 wg 置为自己的地址 —— netpollblockcommit()

2.4 net 包

  • net 包是 go 原生的网络包;

  • net 包实现了 TCP、UDP、HTTP 等网络操作;

  • 使用 net.Listen() 可以得到 LISTEN 状态的 socket —— listener;

  • 使用 listener.Accept() 可以得到 ESTABLISHED 状态的 socket —— conn;

  • conn.Read() / Writer() 可以进行读写 socket 的操作;

  • network poll 作为上述功能的底层支撑;

    本文仅介绍 TCP 相关的部分。

2.4.1 net.netFD

netFD 是 Go 中 net 包对 socket 之类的网络文件描述符的抽象。

1
2
3
4
5
6
7
8
9
10
11
12
// Network file descriptor.
type netFD struct {
pfd poll.FD

// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}

2.4.2 net.Listen() Listenter

1
2
3
4
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
  1. 新建 socket,并执行 bind 操作;
  2. 新建一个 netFD,它是 net 包对 socket 的详情描述;
  3. 返回一个 TCPListener 对象,底层是调用了 runtime_pollOpen 方法,将 TCPListener 的 FD 信息加入监听。TCPListener 对象本质是一个 LISTEN 状态的 socket。

2.4.3 listener.Accept()

1
2
3
4
5
6
7
8
9
10
11
12
// Accept implements the Accept method in the [Listener] interface; it
// waits for the next call and returns a generic [Conn].
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
  1. 调用 tcpListener 的 accept,本质上就是调用处于 LISTEN 状态的 socket 的 accept 方法,看看有无新的连接;
  2. 如果失败,休眠等待新的连接,底层调用了 runtime_pollWait;
  3. 如果有新的连接,那就包装成一个新的 socket,最后返回为一个 TCPConn 变量,底层是调用了 runtime_pollOpen 方法,将 TCPConn 的 FD 信息加入监听。TCPConn 对象本质是一个 ESTABLISHED 状态的 socket。

2.4.4 conn.Read() / conn.Write()

这两个方法原理差不多,下面以 Read() 为例。

1
2
3
4
5
6
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
...
n, err := c.fd.Read(b)
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
// ...
// 循环读数据
for {
// 1. 调用系统命令 syscall.Read,读取 sysfd 上的数据,然后往 p 写数据
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
// 2. syscall.EAGAIN 说明还没数据,得先等等
if err == syscall.EAGAIN && fd.pd.pollable() {
// 3. 挂起,休眠等待
if err = fd.pd.waitRead(fd.isFile); err == nil {
// 4. 当有数据来的时候,会被唤醒走到这里,然后在回到 for 循环读取数据
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
  1. 底层直接调用 socket 原生读写方法(syscall.Read、syscall.Write);
  2. 成功则直接返回;
  3. 如果失败,休眠等待可读 / 可写事件的发生;
  4. 被唤醒后重新调用系统 socket 进行读写;

2.4.5 net.DialTCP()

Dial() 方法支持 TCP、UDP、IP、unix、unixgram 和 unixpacket 网络通讯方式,它是一个统共的方法,通过传入 network 字段来区分不同的网络类型,所以它前面很多的操作,都是在判断当前是什么网络类型。本文主要讲 TCP 的实现底层,故直接进入 DialTCP() 即可,其他的网络类型,也是大同小异的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func DialTCP(network string, laddr, raddr *TCPAddr) (*TCPConn, error) {
// 1. 看看具体是哪种 tcp 连接
switch network {
case "tcp", "tcp4", "tcp6":
default:
return nil, &OpError{Op: "dial", Net: network, Source: laddr.opAddr(), Addr: raddr.opAddr(), Err: UnknownNetworkError(network)}
}
if raddr == nil {
return nil, &OpError{Op: "dial", Net: network, Source: laddr.opAddr(), Addr: nil, Err: errMissingAddress}
}

// 2. 创建一个系统的网络连接工具
sd := &sysDialer{network: network, address: raddr.String()}
var (
c *TCPConn
err error
)

// 3. 进行 TCP 连接
if sd.MultipathTCP() {
c, err = sd.dialMPTCP(context.Background(), laddr, raddr)
} else {
c, err = sd.dialTCP(context.Background(), laddr, raddr)
}
if err != nil {
return nil, &OpError{Op: "dial", Net: network, Source: laddr.opAddr(), Addr: raddr.opAddr(), Err: err}
}
return c, nil
}
1
2
3
4
5
6
7
8
9
10
func (sd *sysDialer) dialTCP(ctx context.Context, laddr, raddr *TCPAddr) (*TCPConn, error) {
if h := sd.testHookDialTCP; h != nil {
return h(ctx, sd.network, laddr, raddr)
}
if h := testHookDialTCP; h != nil {
return h(ctx, sd.network, laddr, raddr)
}
// 4. 进入 doDialTCP
return sd.doDialTCP(ctx, laddr, raddr)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (sd *sysDialer) doDialTCP(ctx context.Context, laddr, raddr *TCPAddr) (*TCPConn, error) {
return sd.doDialTCPProto(ctx, laddr, raddr, 0)
}

func (sd *sysDialer) doDialTCPProto(ctx context.Context, laddr, raddr *TCPAddr, proto int) (*TCPConn, error) {
ctrlCtxFn := sd.Dialer.ControlContext
if ctrlCtxFn == nil && sd.Dialer.Control != nil {
ctrlCtxFn = func(ctx context.Context, network, address string, c syscall.RawConn) error {
return sd.Dialer.Control(network, address, c)
}
}
// 5. 有了前面的基础,到这就明白了
// internetSocket 创建一个 fd,生成一个新的 socket,并注册到 epoll 中监听
fd, err := internetSocket(ctx, sd.network, laddr, raddr, syscall.SOCK_STREAM, proto, "dial", ctrlCtxFn)
for i := 0; i < 2 && (laddr == nil || laddr.Port == 0) && (selfConnect(fd, err) || spuriousENOTAVAIL(err)); i++ {
if err == nil {
fd.Close()
}
fd, err = internetSocket(ctx, sd.network, laddr, raddr, syscall.SOCK_STREAM, proto, "dial", ctrlCtxFn)
}

if err != nil {
return nil, err
}
// 6. 返回一个 TCPConn
return newTCPConn(fd, sd.Dialer.KeepAlive, sd.Dialer.KeepAliveConfig, testPreHookSetKeepAlive, testHookSetKeepAlive), nil
}
  1. 创建一个系统的网络连接工具 sysDialer;
  2. dial 进行 TCP 连接,连接不上那就是 connect refused;
  3. 连接上的话,创建一个新的 socket,并最后返回为一个 TCPConn 变量,底层是调用了 runtime_pollOpen 方法,将 TCPConn 的 FD 信息加入监听。TCPConn 对象本质是一个 ESTABLISHED 状态的 socket。

3. 总结