KCP 源码分析与原理总结

序言

本文很大部分参考了 详解 KCP 协议的原理和实现,非常感谢该文作者的讲解。本文再此基础上,加入了一些笔者的思考和分析图示,以期更好地理解 KCP 的底层原理。

结论先行

KCP 是一个快速可靠协议,能以比 TCP 浪费 10%-20% 的带宽的代价,换取平均延迟降低 30%-40%,且最大延迟降低三倍的传输效果。

TCP 是为流量设计的(每秒内可以传输多少 KB 的数据),讲究的是充分利用带宽。而 KCP 是为流速设计的(单个数据包从一端发送到一端需要多少时间),以 10%-20% 带宽浪费的代价换取了比 TCP 快 30%-40% 的传输速度。TCP 信道是一条流速很慢,但每秒流量很大的大运河,而 KCP 是水流湍急的小激流。

KCP 增加的带宽在哪里?增加的速度又在哪里?

为什么 KCP 能以比 TCP 浪费 10%-20% 的带宽的代价,换取平均延迟降低 30%-40%?

KCP 核心特性

快速重传: KCP支持快速重传机制,不像 TCP 那样依赖超时重传。KCP 可以根据接收方返回的确认信息快速判断哪些数据包已经丢失,并迅速进行重传。

选择性确认(Selective Acknowledgment, SACK): KCP 支持 SACK,这允许接收端告知发送端哪些包已经收到,从而仅重传未被确认接收的数据包,减少不必要的重传。

无连接操作: 基于 UDP 的实现使得 KCP 在传输数据前不需要像 TCP 那样进行三次握手建立连接,这减少了初始的延迟,并使其能在连接性较差的网络环境下更加灵活和快速。

拥塞控制: KCP 实现了类似 TCP 的拥塞控制算法,但更为简化,能够快速适应网络条件的变化,如带宽波动和丢包。

流量控制: KCP 允许调整发送和接收的窗口大小,使得发送方可以根据接收方的处理能力和网络条件调整数据发送速率,优化网络利用率和减少拥塞。

可配置的传输策略: KCP 允许用户根据应用需求调整内部参数,如传输间隔、窗口大小等,以达到最优的传输效率和延迟。

前向错误校正(Forward Error Correction, FEC): KCP 还可以结合使用 FEC 技术,通过发送额外的冗余数据来恢复丢失的包,进一步提高在高丢包环境下的数据传输可靠性。

为什么 TCP 做不到 KCP 这样?

TCP 作为一种成熟且广泛使用的传输协议,在设计上注重可靠性和通用性,因此在拥塞控制和流量控制方面相对保守,以确保在各种网络条件下都能稳定运行。然而,这些设计上的保守性也导致了 TCP 在某些情况下的灵活性和自适应性不如 KCP。

特性类别 协议 描述
拥塞控制机制 TCP 固定算法(慢启动、拥塞避免等),保守的调整策略(指数和线性增长)
KCP 灵活算法,动态调整策略,快速调整窗口大小
重传机制的延迟 TCP 固定重传间隔(RTO),多次确认触发重传,需要主动开启选择性重传(SACK)
KCP 快速重传,选择性重传,减少重传延迟
流量控制 TCP 固定流量控制(依赖接收窗口和发送窗口),通用性设计
KCP 自适应流量控制,应用层反馈调整发送窗口和重传策略
应用场景 TCP 广泛应用于各种网络环境,标准化要求高
KCP 优化特定场景(如高丢包率和高延迟网络),灵活实现

1. 拥塞控制机制的固定性

TCP

  • 固定算法:TCP 的拥塞控制算法,如慢启动(Slow Start)、拥塞避免(Congestion Avoidance)、快速重传(Fast Retransmit)和快速恢复(Fast Recovery),在设计时考虑了广泛的兼容性和可靠性。这些算法虽然有效,但其调整机制相对固定,响应速度较慢。
  • 保守的调整策略:TCP 的拥塞控制算法采用了保守的调整策略,例如指数增长和线性增长,这在高丢包率或高延迟网络中,可能会导致拥塞窗口(cwnd)增长速度较慢,影响传输效率。

KCP

  • 灵活算法:KCP 的拥塞控制机制更为灵活,可以根据实时网络状况进行快速调整。例如,KCP 的快速重传和选择性重传机制,使其能更快速地响应网络丢包情况。
  • 动态调整策略:KCP 的拥塞窗口调整更为灵活,可以根据网络状况快速增加或减少窗口大小,提高传输效率。

2. 重传机制的延迟

TCP

  • 固定重传间隔:TCP 使用固定的重传超时(RTO),并随着每次重传逐渐增加(指数回退),这种保守的重传机制在高延迟和高丢包率网络中可能导致重传延迟较长。
  • 多次确认触发重传:TCP 的快速重传需要等待三个重复的 ACK 才能触发,这在丢包率较高的情况下,可能会导致较长的延迟。

KCP

  • 快速重传:KCP 在检测到丢包后立即进行重传,而不需要等待多个重复的 ACK,这显著减少了重传延迟。
  • 选择性重传:KCP 只重传丢失的数据包,而不是所有未确认的数据包,减少了不必要的重传开销。(TCP 其实也支持选择性重传 SACK)

3. 流量控制的灵活性

TCP

  • 固定流量控制:TCP 的流量控制主要依赖于接收窗口(rwnd)和发送窗口(swnd),在处理突发流量或变化较大的网络条件时,调整速度较慢。
  • 通用性设计:TCP 作为一种通用协议,其设计必须兼顾各种网络环境,因此在流量控制上相对保守,以确保在任何环境下都能稳定运行。

KCP

  • 自适应流量控制:KCP 的流量控制机制可以根据实际应用需求进行更细粒度的调整。例如,KCP 可以根据延迟抖动、丢包率等动态参数调整发送速率,确保在不同网络条件下都能保持高效传输。
  • 应用层反馈:KCP 可以根据应用层的实时反馈,动态调整发送窗口和重传策略,进一步优化传输效率。

4. 应用场景的差异

TCP

  • 广泛应用:TCP 设计用于广泛的网络环境,包括稳定的有线网络和不稳定的无线网络,因此其机制必须足够通用和保守,保证在各种情况下的可靠性。
  • 标准化要求:作为互联网的基础协议,TCP 的各项机制经过严格标准化,任何修改都需要广泛测试和验证,以确保不会影响现有网络的稳定性。

KCP

  • 特定优化:KCP 设计初衷是优化特定场景下的传输性能,特别是高丢包率和高延迟网络,因此在设计上更加灵活,能够根据实时网络状况进行调整。
  • 灵活实现:KCP 可以根据具体应用需求进行优化,例如在实时通信和在线游戏等场景中,灵活的流量控制和快速重传机制显著提升了传输效率。

结论

虽然 TCP 在拥塞控制和流量控制方面具备基本的动态调整能力,但其保守的设计和标准化要求使得其在高丢包率和高延迟网络中的适应性和灵活性不如 KCP。KCP 通过灵活的拥塞控制、快速重传和自适应流量控制机制,能够更有效地应对不同网络条件下的传输需求,提供更高效的传输性能。

KCP 一定比 TCP 快吗?

不一定。KCP 并不一定在所有情况下都比 TCP 快。虽然 KCP 在某些特定网络环境(如高丢包率和高延迟的网络)中表现更优异,但在某些情况下,TCP 可能更合适。

1. 网络环境

高丢包率和高延迟网络

  • KCP:KCP 通过快速重传和选择性重传机制,以及动态调整的窗口和重传间隔,能够更好地应对高丢包率和高延迟网络,减少传输延迟,提高传输效率。
  • TCP:TCP 的重传机制和保守的拥塞控制在这种环境中可能导致较高的延迟和较低的带宽利用率。

低丢包率和低延迟网络

  • KCP:在稳定的低丢包率和低延迟网络中,KCP 的频繁重传和控制报文可能会导致额外的带宽开销,未必有明显的性能优势。
  • TCP:TCP 在这种环境中表现稳定,且由于其带宽开销较小,可能比 KCP 更高效。

2. 带宽利用率

带宽充足的网络

  • KCP:KCP 由于其频繁的重传和控制报文,可能会占用更多的带宽,但如果带宽充足,这种开销对整体性能影响较小,且其低延迟优势可能更明显。
  • TCP:TCP 的带宽利用率较高,适合带宽充足的环境。

带宽受限的网络

  • KCP:KCP 的额外带宽开销在带宽受限的网络中可能会显著影响整体传输效率。
  • TCP:TCP 的较低带宽开销使其在带宽受限的环境中更有优势。

3. 应用场景

实时应用(如在线游戏、视频会议):

  • KCP:KCP 的低延迟和快速响应能力使其非常适合实时应用,在这些场景中,传输的及时性比带宽利用率更重要。
  • TCP:TCP 在这些场景中的表现可能不如 KCP,特别是在高丢包率和高延迟的网络中。

非实时应用(如文件传输、网页浏览):

  • KCP:KCP 在这些场景中可能不如 TCP 高效,特别是在网络稳定且带宽有限的情况下。
  • TCP:TCP 的可靠性和高带宽利用率使其非常适合非实时应用。

4. 实现和配置

实现复杂性

  • KCP:实现和配置 KCP 可能比 TCP 更复杂,需要根据具体应用和网络环境进行优化和调整。
  • TCP:TCP 是一个成熟的协议,系统和库的支持较好,配置和使用相对简单。

总结

KCP 在某些特定环境和应用场景中确实比 TCP 更快,尤其是高丢包率和高延迟的网络环境,以及对低延迟要求较高的实时应用。但在网络稳定、带宽有限或非实时应用场景中,TCP 可能表现更好。因此,选择使用 KCP 还是 TCP 应根据具体的网络条件和应用需求进行权衡。

前置准备

笔者不想那么快就贴出大段大段的代码进行分析,这可能会使读者不知所云。为了更好地阐述 KCP 的底层原理,笔者的设想是先对原理部分进行概要总结,然后再带着这些结论去分析源码,进一步填充里面的边角细节。

但是呢,为了更好地理解 KCP 的原理,又不得不对涉及源码的一些重要设计,为了避免在原理分析阶段,对源码进行过多的涉及,笔者决定添加这单独的一章内容,对 KCP 的“接口设计”、“报文段”、“KCP 控制块”以及“队列和缓冲区”先进行简要概述,以辅助读者更好地理解后续的内容。

接口设计

KCP 工作简约图

kcp.h 文件中,定义了 KCP 最核心的几个接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 创建一个新的 KCP 控制对象
ikcpcb* ikcp_create(IUINT32 conv, void *user);

// 释放一个 KCP 控制对象。
void ikcp_release(ikcpcb *kcp);

// 设置 KCP 的输出回调函数,这个回调函数在 KCP 需要发送数据时被调用。
void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len,
ikcpcb *kcp, void *user));

// 从 KCP 的接收队列中接收数据,用于上层从 KCP 中读取数据。
int ikcp_recv(ikcpcb *kcp, char *buffer, int len);

// 向 KCP 的发送队列中添加数据,用于上层向 KCP 发送数据,KCP 会管理这些数据并负责其可靠传输。
int ikcp_send(ikcpcb *kcp, const char *buffer, int len);

// 更新 KCP 的内部状态,通常需要定期调用。
// 这个函数负责处理 KCP 的超时、重传等操作,需要在一定的时间间隔内反复调用(通常每 10-100 毫秒)。
void ikcp_update(ikcpcb *kcp, IUINT32 current);

// 判断是否要调用 ikcp_update
IUINT32 ikcp_check(const ikcpcb *kcp, IUINT32 current);

// 处理接收到的低层数据包(例如 UDP 包)。
int ikcp_input(ikcpcb *kcp, const char *data, long size);

// 将缓冲区可以发送的包发送出去,会在 ikcp_update 中被调用。
void ikcp_flush(ikcpcb *kcp);
  • ikcp_create:

    • conv: 会话标识符,用于标识两个端点之间的连接。这个标识符在两个通信端点之间必须一致。
    • user: 用户数据指针,可以传递任意用户数据,这个数据在 KCP 的 output 回调中会被传递回去。
    • 返回值: 一个指向新创建的 KCP 控制块(ikcpcb)的指针。
  • ikcp_release: 释放一个 KCP 控制对象。

  • ikcp_setoutput: 设置 KCP 的输出回调函数。

    • output: 输出回调函数指针。这个回调函数在 KCP 需要发送数据时被调用。

      • buf: 要发送的数据缓冲区。
      • len: 数据长度。
      • kcp: 当前的 KCP 对象。
      • user: 用户数据。

      通过这个回调,KCP 可以将要发送的数据传递给下层的网络层,比如 UDP 套接字。

  • ikcp_recv: 从 KCP 的接收队列中接收数据。

    • kcp: KCP 控制对象的指针。
    • buffer: 用户提供的缓冲区,用于存储接收到的数据。
    • len: 缓冲区的长度。
    • 返回值: 成功接收的数据大小;如果没有数据可接收,返回负值(例如,EAGAIN)。

    这个函数用于上层从 KCP 中读取数据。

  • ikcp_send: 向 KCP 的发送队列中添加数据。

    • kcp: KCP 控制对象的指针。
    • buffer: 要发送的数据缓冲区。
    • len: 数据的长度。
    • 返回值: 成功发送的数据大小;如果发送失败,返回负值。

    这个函数用于上层向 KCP 发送数据,KCP 会管理这些数据并负责其可靠传输。

  • ikcp_update: 更新 KCP 的内部状态,通常需要定期调用。

    • kcp: KCP 控制对象的指针。
    • current: 当前的时间戳(以毫秒为单位)。

    这个函数负责处理 KCP 的超时、重传等操作,需要在一定的时间间隔内反复调用(通常每 10-100 毫秒)。

  • ikcp_input: 处理接收到的低层数据包(例如 UDP 包)。

    • kcp: KCP 控制对象的指针。
    • data: 收到的数据缓冲区。
    • size: 数据的长度。
    • 返回值: 成功处理的数据大小;如果处理失败,返回负值。
  • ikcp_flush: 刷新待发送的数据。

其中最重要的是这 4 个:

  • ikcp_send: 将数据放在发送队列中等待发送。
  • ikcp_recv: 从接收队列中读取数据。
  • ikcp_input: 读取下层协议输入数据,解析报文段,如果是数据,就将数据放入接收缓冲区,如果是 ACK,就在发送缓冲区中标记对应的报文段已送达。
  • ikcp_flush: 调用输出回调将发送缓冲区的数据发送出去。

这里就先简要介绍到这里,后面在源码分析篇章再对这些接口进行详细分析。

报文段

KCP 的报文段大小为 24 字节,结构如下图所示:

KCP 报文段

每个字段的含义如下:

  • conv: 连接标识
  • cmd:报文类型
  • frg:分片数量,表示随后还有多少个报文属于同一个包
  • wnd:发送方剩余接收窗口的大小
  • ts:时间戳
  • sn:报文编号
  • una:发送方的接收缓冲区中最小还未收到的报文段的编号,也就是说,比它小的报文段都已全部接收
  • len:数据段长度
  • data:数据段,只有数据报文会有这个字段

其中 cmd 共有 4 种报文类型:

  • 数据报文:IKCP_CMD_PUSH
  • 确认报文:IKCP_CMD_ACK
  • 窗口探测报文:IKCP_CMD_WASK 询问对端剩余接收窗口的大小
  • 窗口通知报文:IKCP_CMD_WINS 通知对端剩余接收窗口的大小

在 KCP 中,报文段结构定义在 kcp.h 文件中,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct IKCPSEG
{
struct IQUEUEHEAD node;
IUINT32 conv;
IUINT32 cmd;
IUINT32 frg;
IUINT32 wnd;
IUINT32 ts;
IUINT32 sn;
IUINT32 una;
IUINT32 len;
IUINT32 resendts;
IUINT32 rto;
IUINT32 fastack;
IUINT32 xmit;
char data[1];
};

IKCPSEG 结构还多出了几个字段,这是为了支持 KCP 协议的可靠性和效率:

  • resendts: 记录报文的下次重传时间,用于实现重传机制。如果报文在一定时间内没有被确认收到,就会在这个时间戳之后被重新发送。
  • rto: 表示当前报文的重传超时时间(RTT 的估计值)。用于计算每个报文的重传时间,如果超过 rto 时间没有收到 ACK,会触发重传。
  • fastack: 快速重传计数,记录该报文被跳过的次数。如果一个报文的 ACK 连续接收到多个对同一报文的确认,而不是新的报文,会增加这个计数,用于实现快速重传机制。
  • xmit: 记录报文已经被发送的次数。用于统计一个报文的重传次数,帮助判断传输的可靠性。如果操作 dead_link 次,则会判断为连接失效,KCP 会断开连接。
  • node: 链表节点,用于将多个 IKCPSEG 结构体链接在一起。KCP 的队列和缓冲区都是循环双链表结构。

这些字段共同作用,帮助 KCP 实现以下功能:

  • 可靠性:通过 snunaack 确保数据包按顺序接收和重传。
  • 流量控制:通过 wnd 控制数据流量,避免接收方过载。
  • 高效传输:通过 resendtsrto 进行超时和重传控制,fastack 提供快速重传机制。
  • 灵活管理:使用链表节点 node 组织数据,便于内部管理。

KCP 控制块 ikcpcb

上面我们提到的 ikcp_createikcp_release 就是对 KCP 控制块 ikcpcb 的创建和释放,每个 KCP 连接都对应一个 KCP 控制块。它定义在 kcp.h 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
struct IKCPCB
{
IUINT32 conv, mtu, mss, state;
IUINT32 snd_una, snd_nxt, rcv_nxt;
IUINT32 ts_recent, ts_lastack, ssthresh;
IINT32 rx_rttval, rx_srtt, rx_rto, rx_minrto;
IUINT32 snd_wnd, rcv_wnd, rmt_wnd, cwnd, probe;
IUINT32 current, interval, ts_flush, xmit;
IUINT32 nrcv_buf, nsnd_buf;
IUINT32 nrcv_que, nsnd_que;
IUINT32 nodelay, updated;
IUINT32 ts_probe, probe_wait;
IUINT32 dead_link, incr;
struct IQUEUEHEAD snd_queue;
struct IQUEUEHEAD rcv_queue;
struct IQUEUEHEAD snd_buf;
struct IQUEUEHEAD rcv_buf;
IUINT32 *acklist;
IUINT32 ackcount;
IUINT32 ackblock;
void *user;
char *buffer;
int fastresend;
int fastlimit;
int nocwnd, stream;
int logmask;
int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);
void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
};

字段的含义如下,读者可在后续分析过程回过来查阅:

字段名 含义
conv 连接标识符,用于识别一个特定的会话。
mtu 最大传输单元(Maximum Transmission Unit),表示网络层传输数据包的最大字节数。
mss 最大报文段长度(Maximum Segment Size),表示应用层传输数据的最大字节数。
state 连接状态,标识当前的传输状态。
snd_una 未确认的发送序号,表示最早未确认的包的序号。
snd_nxt 下一个发送序号,表示即将发送的包的序号。
rcv_nxt 下一个接收序号,表示期望接收的下一个包的序号。
ts_recent 最近的时间戳,用于延迟测量。
ts_lastack 最近的确认时间戳,用于 RTT 计算。
ssthresh 拥塞避免的慢启动阈值。
rx_rttval RTT 的偏差,用于计算 RTT 的波动。
rx_srtt 平滑的 RTT 值,用于计算平均 RTT。
rx_rto 重新传输超时时间,根据 RTT 动态调整。
rx_minrto 最小的重新传输超时时间。
snd_wnd 发送窗口大小,控制发送流量的窗口。
rcv_wnd 接收窗口大小,控制接收流量的窗口。
rmt_wnd 远端窗口大小,表示对方接收窗口的大小。
cwnd 拥塞窗口大小,控制发送流量的窗口,用于拥塞控制。
probe 探测标志,表示是否需要进行窗口探测。
current 当前的时间戳。
interval 刷新间隔时间,表示定期刷新 KCP 状态的间隔。
ts_flush 下次刷新时间戳,用于确定何时执行下一次状态刷新。
xmit 发送次数,表示数据包重传的次数。
nrcv_buf 接收缓冲区的数据包数量。
nsnd_buf 发送缓冲区的数据包数量。
nrcv_que 接收队列中的数据包数量。
nsnd_que 发送队列中的数据包数量。
nodelay 延迟模式标志,表示是否启用无延迟模式。
updated 更新标志,表示是否需要更新 KCP 状态。
ts_probe 下次探测时间戳,用于窗口探测。
probe_wait 探测等待时间,表示等待多长时间后进行下一次窗口探测。
dead_link 死链标志,表示连接是否已经失效。
incr 增量,用于控制流量的增加速率。
snd_queue 发送队列,用于存储待发送的数据包。
rcv_queue 接收队列,用于存储待处理的数据包。
snd_buf 发送缓冲区,用于存储已经发送但未确认的数据包。
rcv_buf 接收缓冲区,用于存储已经接收到但未处理的数据包。
acklist 确认列表,用于存储待发送的确认序号。
ackcount 确认计数,表示确认列表中的条目数量。
ackblock 确认块大小,表示确认列表的内存分配大小。
user 用户数据指针,用于存储用户自定义的数据。
buffer 缓冲区,用于临时存储发送的数据。
fastresend 快速重传标志,表示启用快速重传功能。
fastlimit 快速重传限制,表示在一个 RTT 内允许的最大重传次数。
nocwnd 无拥塞窗口控制标志,表示是否禁用拥塞窗口控制。
stream 流模式标志,表示是否启用流模式。
logmask 日志掩码,用于控制日志输出的级别。
output 发送数据回调函数,用于发送数据。
writelog 日志回调函数,用于输出日志。

队列和缓冲区

1
2
3
4
5
6
7
8
9
10
11
12
13
struct IKCPCB
{
...
struct IQUEUEHEAD snd_queue;
struct IQUEUEHEAD rcv_queue;
struct IQUEUEHEAD snd_buf;
struct IQUEUEHEAD rcv_buf;
...
};

struct IQUEUEHEAD {
struct IQUEUEHEAD *next, *prev;
};

KCP 中队列和缓冲区都是循环双链表,链表由宏实现,笔者并不擅长,所以本文就不探讨该链表的实现了,有数据结构基础的笔者应该很好理解这一块。

队列和缓冲区的实现:循环双链表

队列和缓冲区是 KCP 最核心的部分,它们的作用流程大概如下图所示,读者可以自行阅读尝试理解,后续我们会进行详细的分析。

KCP 队列和缓冲区作用流程

原理分析

这一节我们详细讨论 KCP 的整个 ARQ 流程。首先我们会对整体流程进行简要概述,然后详细讨论滑动窗口中的发送和接收过程,接着讨论超时重传和快速重传,在这之后我们会将 KCP 和 TCP 的重传策略进行简单对比,最后介绍一下拥塞控制策略。

1. 整体流程

KCP 全流程

KCP 的全流程如上图所示:

  1. 发送方调用 ikcp_send 将发送数据,这个时候会创建报文段实例,并放入 snd_queue 发送队列中。
  2. KCP 会定时调用 ikcp_update 判断是否要调用 ikcp_flush
  3. 调用 ikcp_flush 时会将合适的报文段放入 snd_buf 缓冲区中,具体包括:
    1. 发送 ACK 列表中所有 ACK;
    2. 根据是否需要发送窗口探测和通知报文,需要则发;
    3. 根据发送窗口大小,将适量的报文段从 snd_queue 移入 snd_buf 中;
    4. 发送 snd_buf 中的报文,包括新加入的RTO 内未收到 ACK 的和 ACK 失序若干次的;
    5. 根据丢包情况计算 ssthreshcwnd
  4. 发送的时候会调用由 ikcp_setoutput 设置的回调函数,将数据发送到对端。
  5. 接收方收到数据后,会调用 ikcp_input,将数据放入 rcv_buf 缓冲区,具体包括:
    1. 根据所有报文的 una 将相应的报文标记为已送达;
    2. 如果是 ACK,就将相应的报文标记为已送达;
    3. 如果是数据报文,就将它放入 rcv_buf,然后将 rcv_buf 中顺序正确的报文移入 rcv_queue 接收队列中,接着将相关信息插入 ACK 列表,在稍后的 ikcp_flush 中会发送相应的 ACK;
    4. 如果是窗口探测报文,就标记“需要发送窗口通知”,在稍后的 ikcp_flush 中会发送窗口通知报文;
    5. 包括窗口通知报文在内的所有报文都有 wnd 字段,据此更新 rmt_wnd;
    6. 根据 ACK 失序情况决定是否进行快速重传;
    7. 计算 cwnd。
  6. 调用 ikcp_recvrcv_queue 中接收数据。

2. 滑动窗口

发送缓冲区 snd_buf 和接收缓冲区 rcv_buf 中活动的报文都是在滑动窗口之中的。这对于我们理解 KCP 的发送和接收流程非常重要,所有我们先从滑动窗口开始介绍。

滑动窗口实际是一个抽象的概念, 不能简单地认为它是缓冲区的一部分,准确的说,滑动窗口是由队列加缓冲区共同组成的。

2.1 发送

发送窗口

snd_unasnd_nxt 会努力往移动:

  1. ikcp_flush 时,会从 snd_queue 中取出报文插入到 snd_nxt 的位置上;
  2. 如果 snd_nxt - snd_una >= cwnd,则不允许新的报文插入;
  3. snd_una 的 ACK 报文到达时,snd_una 就会右移到第一个没有收到 ACK 报文的位置;

发送窗口中未确认到达的报文何时重传?

  • 报文在一个 RTO 时间内仍未确认到达,就会重传。报文 RTO 初始值是 rx_rto ,会持续增长,速率支持配置。

2.2 接收

接收窗口
  1. 每收到一个数据报文, 都会根据它的编号将它插入到 rcv_buf 对应的位置中;
  2. 接着检查 rcv_nxt 能否向右移动, 只有当报文的顺序正确且连续才能移动;
  3. 在上图的例子中由于 4 号报文的缺失, rcv_nxt 只能处于 4 号位置等待,5, 6 号报文也不能移动到 rcv_queue 中;
  4. 等到 4 号报文到达后,才能将 4, 5, 6 号报文一并移动到 rcv_queue 中,同时 rcv_nxt 会右移到 7 号位置。

2.3 案例分析

我们举个简单的例子演示整个 ARQ 的流程。下图中实线箭头表示数据报文,虚线箭头表示 ACK。

KCP ARQ 流程

① t1 时刻发送方发送 1 号报文, 1 号报文放入发送缓冲区中, snd_una 指向 1, snd_nxt 指向 2.

② t2 至 t3 时刻发送方依次发送 2 至 3 号报文, snd_nxt 依次后移.

③ 1 号报文丢包.

④ t4, t5 时刻接收方收到 3 号和 2 号报文, 放入 rcv_buf 中; 随后回复 3 号和 2 号 ACK. 此时由于 1 号报文缺失, rcv_nxt 始终指向 1.

⑤ 3 号 ACK 丢包.

⑥ t7 时刻发送方收到 2 号 ACK, 将 2 号报文标记为已送达. 此时由于 3 号 ACK 丢包, 3 号报文未标记为已送达. 由于 1 号报文未确认送达, snd_una 亦指向 1.

⑦ t8 时刻 1 号报文超时, 重传.

⑧ t9 时刻接收方收到 1 号报文, 放入 rcv_buf 中; 这时 1, 2, 3 号报文顺序正确, rcv_nxt 右移到 4 号位置. 接收方回复 1 号 ACK, 同时带上 una = 4.

⑨ t10 时刻发送方收到 1 号 ACK, 将 1 号报文标记为已送达. 同时 una 表明 1, 2, 3 号报文均已送达, 因此也将 3 号报文标记为已送达. snd_una 移动到 4.

3. 超时重传

超时重传是当发送的数据包在预定时间内未被确认时,重新发送该数据包的机制。在 KCP 中,这个时间由重新传输超时(RTO)决定。KCP 计算 RTO 初始值的方法是 TCP 的标准方法, 规定在 RFC 6298 中。

这里还是贴出源码讲比较直观:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static void ikcp_update_ack(ikcpcb *kcp, IINT32 rtt)
{
IINT32 rto = 0;
if (kcp->rx_srtt == 0) {
kcp->rx_srtt = rtt;
kcp->rx_rttval = rtt / 2;
} else {
long delta = rtt - kcp->rx_srtt;
if (delta < 0) delta = -delta;
kcp->rx_rttval = (3 * kcp->rx_rttval + delta) / 4;
kcp->rx_srtt = (7 * kcp->rx_srtt + rtt) / 8;
if (kcp->rx_srtt < 1) kcp->rx_srtt = 1;
}
rto = kcp->rx_srtt + _imax_(kcp->interval, 4 * kcp->rx_rttval);
kcp->rx_rto = _ibound_(kcp->rx_minrto, rto, IKCP_RTO_MAX);
}

这个计算过程笔者就不做详细介绍了,代码里面的公式读者可以尝试自行画图进行理解,这里就不花大篇幅画公式了,下面我尝试以更通俗易懂的话语解释 RTO,只需要理解它在做什么,为什么这么做,就可以了,个人觉得对公式的细节可以暂且忽略。

3.1 RTO 计算目的

KCP 的 RTO 计算是为了确定在多长时间内未收到确认(ACK)时,应该重新发送数据包。这段时间被称为重传超时时间(RTO)。计算 RTO 的目的是在网络条件变化的情况下,既能快速响应数据丢失,也能避免不必要的重传,从而保持高效的传输。

3.2 RTO 计算涉及的变量解释

RTT 和 SRTT 的概念:

  • RTT(Round-Trip Time): 是从发送一个数据包到收到其确认(ACK)所花的时间。
  • SRTT(Smoothed RTT): 是 RTT 的加权平均值,它代表了 RTT 的一个更稳定的估计值。SRTT 的目的是减少 RTT 的短期波动对 RTO 的影响。

RTT 变化值(RTT variance):网络传输时间并不总是固定的,有时会因为网络拥塞或其他原因出现波动。我们通过计算 RTT 变化值(RTT variance)来估计这种波动的大小。

为什么需要 SRTT 和 RTT 变化值:

  • SRTT 给我们一个平均的 RTT 估计值。
  • RTT 变化值告诉我们网络的波动性。如果波动很大,我们希望 RTO 更大,以免因为短暂的网络延迟就触发不必要的重传。

3.3 RTO 计算步骤

1. 初始化:初次计算时,我们没有历史 RTT 值,所以直接用第一次测量的 RTT 来初始化 SRTT,并将 RTT 变化值设为 RTT 的一半。

2. 更新 SRTT 和 RTT 变化值:

  • 每次我们测量新的 RTT,就用它来更新 SRTT 和 RTT 变化值。
  • 更新 SRTT:我们不直接替换旧的 SRTT,而是用一个平滑的方式(即加权平均),使得 SRTT 逐渐靠近新 RTT,但又不会剧烈变化。
  • 更新 RTT 变化值:计算新的 RTT 与 SRTT 的差值,用这个差值来更新 RTT 变化值,使其反映当前网络波动的大小。

3. 计算 RTO:

  • 用 SRTT 加上四倍的 RTT 变化值来计算 RTO,这样可以确保 RTO 足够长,能涵盖大部分的网络波动。
  • 我们还要确保 RTO 不小于一个最小值(rx_minrto),以防止 RTO 过小导致频繁重传;也不能大于一个最大值(IKCP_RTO_MAX),以防止 RTO 过大影响响应速度。

4. RTO 计算效果

  • 稳定的传输: SRTT 提供了一个稳定的平均 RTT 估计,使得 RTO 能适应网络的长期变化。
  • 适应网络波动: RTT 变化值使得 RTO 能够应对网络的短期波动,减少因短暂延迟而导致的重传。
  • 快速响应: RTO 设置合理后,能够在数据丢失时快速重传,保持传输的高效和及时性。

通过这样的计算方式,KCP 能够在不同的网络条件下,自动调整重传策略,从而在保证数据可靠性的同时,保持较高的传输效率。

4. 快速重传

在网络传输中,数据包可能会由于网络拥塞、丢包等原因而丢失。超时重传依赖于重传超时时间(RTO)来判断是否需要重传,这可能会导致响应延迟。而快速重传通过检测重复的确认包(ACK)来快速判断数据包的丢失,并立即触发重传,显著缩短了数据丢失的恢复时间。

KCP 快速重传

4.1 何时快速重传?

  • 每个报文的 fastack 记录了它检测到 ACK 失序的次数,每当 KCP 收到一个编号为 sn 的 ACK 时,就会检查 snd_buf 中编号小于 sn 且未确认送达的报文,并将其 fastack 加 1。
  • 可以通过配置 fastresend 指定失序多少次就执行快速重传。
  • 每次调用 ikcp_flush 都会重传 snd_buf 中 fastask >= fastresend 的报文。

4.2 无限快速重传吗?

  • 每个报文的 xmit 记录它被传输的次数,可以配置 fastlimit 规定传输次数小于 fastlimit 的报文才能执行快速重传。

5. 比较 TCP 的超时重传和快速重传

TCP 也实现了类似的机制,但在复杂性和应用场景上有所不同。

5.1 TCP 的超时重传

1. RTT 估算:

  • TCP 通过接收确认包来估算 RTT,并使用 RTT 的变化范围来计算 RTO。

  • TCP 使用 Jacobson/Karels 算法进行 RTT 估算和 RTO 计算:

    1
    2
    3
    4
    // SRTT and RTTVAR calculation
    RTTVAR = (1 - β) * RTTVAR + β * |RTTsample - SRTT|
    SRTT = (1 - α) * SRTT + α * RTTsample
    RTO = SRTT + 4 * RTTVAR

    其中,SRTT 是平滑的 RTT,RTTVAR 是 RTT 的变化范围,α 和 β 是权重因子。

2. 重传策略:

  • 如果在 RTO 时间内未收到 ACK,TCP 会重传未确认的数据包。
  • 每次重传,RTO 值会按照指数增长(指数退避算法)。

3. 拥塞控制:

  • TCP 使用复杂的拥塞控制机制,如慢启动、拥塞避免等,来调整发送窗口和传输速率。

5.2 TCP 的快速重传

  • 当接收到三个重复的 ACK 时,TCP 会立即重传丢失的数据包,而不等待 RTO 超时。
  • 快速重传后,TCP 进入快速恢复状态,调整拥塞窗口,避免拥塞窗口过度收缩。

5.3 比较分析

特性 KCP TCP
RTT 估算 基于加权移动平均,较为简单 使用 Jacobson/Karels 算法,复杂但精确
RTO 计算 简化的计算公式 基于 RTT 的复杂计算
重传机制 超时重传和快速重传 超时重传和快速重传
拥塞控制 简单的拥塞控制,适合低延迟应用 复杂的拥塞控制,适合广泛的传输场景
适用场景 实时应用,如游戏、视频会议 通用应用,如文件传输、HTTP
实现复杂度 较为简单,易于理解和实现 复杂,需处理更多的网络状态和控制
可靠性 依赖于用户自定义的重传和控制策略 内置可靠性和流控制机制
响应速度 高效快速,适用于低延迟和高吞吐量场景 可靠但响应速度较慢,适合稳定传输场景

KCP 和 TCP 都提供了可靠的传输机制,但它们适用于不同的应用场景。KCP 设计简单,适合对延迟敏感的实时应用,而 TCP 拥有完善的拥塞控制和可靠性机制,适合广泛的网络应用。

6. 拥塞控制

拥塞控制是网络传输协议中的一个重要机制,用于防止发送过多的数据包导致网络拥塞。在 KCP 中,拥塞控制相对简单,主要通过发送窗口(snd_wnd)和拥塞窗口(cwnd)来管理数据发送速率。

6.1 三种策略

KCP 有 3 种拥塞控制的策略:

  • 慢启动(slow start)
  • 拥塞避免(congestion avoidance)
  • 快速恢复(fast recovery)

慢启动:先将 cwnd 设置为 1,随后平均每经过一个 RTT 时间,cwnd = cwnd * 2,直到阈值 ssthresh

拥塞避免:cwnd 到 ssthresh 后,cwnd 呈线性增长。

当慢启动或者拥塞避免造成 丢包 后,就采取相应的退让策略:

  1. fastack >= fastresend -> 发生快速重传:将 ssthresh = cwnd / 2cwnd = ssthresh + fastresend 进入快恢复
  2. current >= resentts -> 超时重传:ssthresh = ssthresh / 2cwnd = 1,进入慢启动
拥塞控制中 cwnd 和 ssthresh 的变化情况

6.2 核心概念

KCP 的拥塞控制基于以下几个核心概念:

  • 发送窗口 (snd_wnd):表示发送端在未收到接收端确认之前,允许发送的数据包的数量。它类似于 TCP 中的发送窗口,控制了数据流的速率。
  • 接收窗口 (rcv_wnd):表示接收端能够处理的最大数据包数量。发送端通过接收端的窗口大小来调整自己的发送速率。
  • 远端窗口 (rmt_wnd):表示接收端的窗口大小,发送端会根据这个值调整自己的发送窗口,以避免发送的数据超出接收端的处理能力。
  • 拥塞窗口 (cwnd):用于控制传输中的数据包数量。它基于网络的拥塞情况动态调整,以避免网络拥塞。
  • 慢启动阈值 (ssthresh):用于确定拥塞控制的模式。当 cwnd 小于 ssthresh 时,KCP 处于慢启动模式,否则进入拥塞避免模式。

6.3 窗口探测(Window Probing)

在某些情况下,接收端的窗口可能会被关闭(即 rmt_wnd 为 0),这意味着接收端无法接收任何新的数据。为了应对这种情况,KCP 实现了窗口探测机制:

  • rmt_wnd 为 0 时,KCP 不会立即停止发送数据,而是会定期发送一个探测包,以检测接收端窗口是否已经打开。
  • 这个探测包会触发接收端返回一个 ACK,其中包含最新的接收窗口大小信息。

6.4 调节和配置

KCP 的拥塞控制机制提供了一些配置参数,用户可以通过调整这些参数来优化传输性能:

  • snd_wnd: 发送窗口大小,用户可以根据应用的需求调整该值,以控制数据发送的最大量。
  • rcv_wnd: 接收窗口大小,表示接收端能够处理的最大数据包数量。
  • ssthresh: 慢启动阈值,初始值通常设置为较大的一个常量,用户可以根据网络情况调整。
  • cwnd: 拥塞窗口大小,初始值通常设置为 1,随传输情况动态调整。

7. 比较 TCP 的拥塞控制

7.1 四个阶段

TCP 拥塞控制有四个关键阶段

慢启动(Slow Start)

  • 目的:快速探测网络的可用带宽。
  • 机制:当一个连接刚建立或者从丢包恢复时,cwnd(拥塞窗口)从一个较小的值(通常是 1 个 MSS,即最大报文段大小)开始,并以指数增长的方式增加。
  • 过程:每次收到一个 ACK,cwnd 增加一个 MSS,使得 cwnd 每 RTT 增加一倍,直到 cwnd 达到慢启动阈值(ssthresh)。

拥塞避免(Congestion Avoidance):

  • 目的:逐步探测网络的最大容量,并避免拥塞。
  • 机制:当 cwnd 达到或超过 ssthresh 时,TCP 进入拥塞避免阶段,此时 cwnd 以线性增长的方式增加。
  • 过程:每个 RTT,cwnd 增加 1/cwnd 个 MSS,这种增长方式较为保守,旨在防止过度发送导致的拥塞。

快速重传(Fast Retransmit):

  • 目的:快速响应丢包,提高传输效率。
  • 机制:当发送端收到三个重复的 ACK 时,立即重传被确认丢失的数据包,而不等待 RTO 超时。
  • 过程:快速重传的目的是迅速恢复丢失的数据包,从而减少因丢包导致的等待时间。

快速恢复(Fast Recovery):

  • 目的:在拥塞后快速恢复到适当的传输速率。
  • 机制:在快速重传后,TCP 不会直接进入慢启动,而是保持 cwnd 的一部分,以较快的速度恢复到拥塞避免状态。
  • 过程:将 ssthresh 设置为当前 cwnd 的一半,cwnd 被临时减小,然后在接收新 ACK 时快速增加 cwnd,直到恢复到 ssthresh 为止。

7.2 比较分析

特性 TCP KCP
实现复杂度 复杂,包含多个阶段和算法 简单,主要通过窗口大小控制
拥塞检测 通过 RTT 估算和 ACK 检测丢包 主要通过 ACK 和窗口大小检测丢包
响应速度 响应相对较慢,适合稳定传输 响应较快,适合实时性高的传输
适应性 能适应广泛的网络条件 适应性较好,但更适合低延迟网络
配置灵活性 较为固定,依赖于系统配置和优化 提供更多的配置选项,用户可根据需求调整
应用场景 适用于各种需要可靠传输的应用 适用于实时性要求高的应用,如游戏和视频会议
窗口调整 慢启动、拥塞避免、快速重传、快速恢复等机制 主要通过发送窗口和拥塞窗口调整
丢包响应 丢包时通过减小 cwndssthresh 来调整 丢包时迅速调整 cwnd 和重传
拥塞控制策略 慢启动、拥塞避免、快速重传、快速恢复等多种策略 主要通过调整 cwndssthresh 进行简单控制
优点 稳定可靠、机制全面、应用广泛 实现简单、响应快、灵活性高、适合实时应用
缺点 复杂、响应慢、初始阶段保守 无法应对更加复杂的网络状况、应用场景有限

TCP 和 KCP 都有各自的拥塞控制机制,适用于不同的应用场景。TCP 提供了复杂而全面的拥塞控制,适合于各种网络条件下的可靠传输,而 KCP 提供了简单高效的控制机制,适合于低延迟和高响应速度的实时应用。选择使用哪种协议取决于具体的应用需求和网络环境。

源码分析

1. 核心数据结构

1.1 IKCPSEG 报文段结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct IKCPSEG {
struct IQUEUEHEAD node; // 链表节点
IUINT32 conv; // 会话ID
IUINT32 cmd; // 命令类型
IUINT32 frg; // 分片序号
IUINT32 wnd; // 窗口大小
IUINT32 ts; // 时间戳
IUINT32 sn; // 序列号
IUINT32 una; // 待接收的下一个包序号
IUINT32 len; // 数据长度
IUINT32 resendts; // 重传时间戳
IUINT32 rto; // 超时重传时间
IUINT32 fastack; // 快速重传计数器
IUINT32 xmit; // 传输次数
char data[1]; // 数据
};

1.2 IKCPCB 控制块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
struct IKCPCB {
// === 基础配置 ===
IUINT32 conv; // 会话ID,用于标识一个会话
IUINT32 mtu; // 最大传输单元,默认1400字节
IUINT32 mss; // 最大报文段大小,默认mtu-24字节
IUINT32 state; // 连接状态,0=正常,-1=断开

// === 发送和接收序号 ===
IUINT32 snd_una; // 第一个未确认的包序号
IUINT32 snd_nxt; // 下一个待发送的包序号
IUINT32 rcv_nxt; // 待接收的下一个包序号

// === 时间戳相关 ===
IUINT32 ts_recent; // 最近一次收到包的时间戳
IUINT32 ts_lastack; // 最近一次收到ACK的时间戳
IUINT32 ssthresh; // 慢启动阈值,默认为IKCP_THRESH_INIT(2)

// === RTT相关 ===
IINT32 rx_rttval; // RTT的变化量
IINT32 rx_srtt; // 平滑后的RTT
IINT32 rx_rto; // 超时重传时间,初始为IKCP_RTO_DEF(200ms)
IINT32 rx_minrto; // 最小重传超时时间,默认为IKCP_RTO_MIN(100ms)

// === 窗口相关 ===
IUINT32 snd_wnd; // 发送窗口大小,默认32
IUINT32 rcv_wnd; // 接收窗口大小,默认128
IUINT32 rmt_wnd; // 远端窗口大小,默认128
IUINT32 cwnd; // 拥塞窗口大小,初始为0
IUINT32 probe; // 探测标志,用于窗口探测

// === 时间相关 ===
IUINT32 current; // 当前时间
IUINT32 interval; // 内部更新时间间隔,默认100ms
IUINT32 ts_flush; // 下次刷新时间
IUINT32 xmit; // 总重传次数

// === 队列计数器 ===
IUINT32 nrcv_buf; // 接收缓存中的包数量
IUINT32 nsnd_buf; // 发送缓存中的包数量
IUINT32 nrcv_que; // 接收队列中的包数量
IUINT32 nsnd_que; // 发送队列中的包数量

// === 配置标志 ===
IUINT32 nodelay; // 是否启用nodelay模式,0=不启用
IUINT32 updated; // 是否调用过update

// === 探测相关 ===
IUINT32 ts_probe; // 下次探测时间
IUINT32 probe_wait; // 探测等待时间

// === 链路控制 ===
IUINT32 dead_link; // 最大重传次数,默认为IKCP_DEADLINK(20)
IUINT32 incr; // 可发送的最大数据量

// === 数据队列 ===
struct IQUEUEHEAD snd_queue; // 发送队列
struct IQUEUEHEAD rcv_queue; // 接收队列
struct IQUEUEHEAD snd_buf; // 发送缓存
struct IQUEUEHEAD rcv_buf; // 接收缓存

// === ACK相关 ===
IUINT32 *acklist; // ACK列表
IUINT32 ackcount; // ACK数量
IUINT32 ackblock; // ACK列表大小

// === 用户相关 ===
void *user; // 用户数据指针
char *buffer; // 临时缓存

// === 快速重传相关 ===
int fastresend; // 触发快速重传的重复ACK个数
int fastlimit; // 快速重传次数限制,默认IKCP_FASTACK_LIMIT(5)

// === 其他配置 ===
int nocwnd; // 是否关闭拥塞控制,0=不关闭
int stream; // 是否为流模式,0=消息模式(默认),1=流模式
int logmask; // 日志掩码,控制日志输出级别

// === 回调函数 ===
// 数据输出回调,用于发送数据
int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user);
// 日志输出回调
void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
};

这个结构体可以大致分为几个主要部分:

  • 基础配置:包含基本的会话标识和传输单元大小设置
  • 序号追踪:用于追踪发送和接收的包序号
  • 时间管理:包含各种时间戳和定时器
  • 窗口控制:实现流量控制和拥塞控制
  • 队列管理:管理数据的发送和接收
  • ACK处理:处理确认包
  • 配置选项:各种功能开关和参数设置
  • 回调函数:用于数据输出和日志记录

2. 核心函数

在进入具体的核心函数分析之前,需要先点明 2 点,kcp 的实现者期望其尽可能地简单和减少依赖,所以数据的输出甚至是当前时间都是由使用者来设置的,即 kcp 本身是不依赖于机器时钟的。具体体现在下面 2 个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//---------------------------------------------------------------------
// set output callback, which will be invoked by kcp
//---------------------------------------------------------------------
void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len,
ikcpcb *kcp, void *user))
{
kcp->output = output;
}

//---------------------------------------------------------------------
// update state (call it repeatedly, every 10ms-100ms), or you can ask
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec.
//---------------------------------------------------------------------
void ikcp_update(ikcpcb *kcp, IUINT32 current)
{
...
}

2.1 ikcp_send 发送数据

ikcp_send 是应用层接口,负责将用户数据分片并加入到发送队列(snd_queue)。

ikcp_send
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
//---------------------------------------------------------------------
// user/upper level send, returns below zero for error
//---------------------------------------------------------------------
int ikcp_send(ikcpcb *kcp, const char *buffer, int len)
{
IKCPSEG *seg;
int count, i;
int sent = 0;

// mtu: 最大传输单元
// mss: 最大报文段大小
// mss = mtu - 包头长度(24)
assert(kcp->mss > 0);
if (len < 0) return -1;

// append to previous segment in streaming mode (if possible)
// 如果是流模式,则将数据追加到前一个分段中(如果可能)
if (kcp->stream != 0) {
// 如果当前发送队列不为空,且前一个分段未满,则将数据追加到前一个分段中
if (!iqueue_is_empty(&kcp->snd_queue)) {
IKCPSEG *old = iqueue_entry(kcp->snd_queue.prev, IKCPSEG, node);
if (old->len < kcp->mss) {
int capacity = kcp->mss - old->len;
int extend = (len < capacity)? len : capacity;
seg = ikcp_segment_new(kcp, old->len + extend);
assert(seg);
if (seg == NULL) {
return -2;
}
// 将新的 seg->node 放入 snd_queue 中等待发送
iqueue_add_tail(&seg->node, &kcp->snd_queue);
// 把上一个报文的数据拷贝过来
memcpy(seg->data, old->data, old->len);
if (buffer) {
memcpy(seg->data + old->len, buffer, extend);
buffer += extend;
}
seg->len = old->len + extend;
seg->frg = 0;
len -= extend;
iqueue_del_init(&old->node);
// 释放之前老数据的 kcp node
ikcp_segment_delete(kcp, old);
sent = extend;
}
}
if (len <= 0) {
return sent;
}
}

// 1. 非流模式,不追加到上一个报文后面
// 2. 流模式,但是上一个报文已满,则创建新的报文

// 计算需要的报文数量,kcp 会对数据进行分段传输
if (len <= (int)kcp->mss) count = 1;
else count = (len + kcp->mss - 1) / kcp->mss;

// 接收窗口位置不够,则暂停发送
if (count >= (int)IKCP_WND_RCV) {
if (kcp->stream != 0 && sent > 0)
return sent;
return -2;
}

if (count == 0) count = 1;

// 发送所有的报文段
for (i = 0; i < count; i++) {
int size = len > (int)kcp->mss ? (int)kcp->mss : len;
seg = ikcp_segment_new(kcp, size);
assert(seg);
if (seg == NULL) {
return -2;
}
if (buffer && len > 0) {
memcpy(seg->data, buffer, size);
}
seg->len = size;
seg->frg = (kcp->stream == 0)? (count - i - 1) : 0;
iqueue_init(&seg->node);

// 将报文段放入 snd_queue 中
iqueue_add_tail(&seg->node, &kcp->snd_queue);
kcp->nsnd_que++;
if (buffer) {
buffer += size;
}
len -= size;
sent += size;
}

return sent;
}

2.2 ikcp_input 接收数据

ikcp_input 负责处理从网络接收到的原始 KCP 数据包,它会处理协议层面的数据,包括 ACK、窗口控制等协议信息,并将接收到的数据放入 KCP 的内部接收缓冲区(rcv_bufrcv_queue)。

2.3 ikcp_recv 获取数据

ikcp_recv 是应用层函数,供上层应用调用以获取完整的消息数据,它从 KCP 的接收队列(rcv_queue)中读取已经排序好的数据,处理分片重组,确保返回完整的消息。

ikcp_recv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
//---------------------------------------------------------------------
// user/upper level recv: returns size, returns below zero for EAGAIN
// 从 rcv_queue 中获取数据
//---------------------------------------------------------------------
int ikcp_recv(ikcpcb *kcp, char *buffer, int len)
{
struct IQUEUEHEAD *p;
int ispeek = (len < 0)? 1 : 0;
int peeksize;
int recover = 0;
IKCPSEG *seg;
assert(kcp);

// 如果 rcv_queue 为空,则直接返回
if (iqueue_is_empty(&kcp->rcv_queue))
return -1;

// 如果 len < 0,则说明是 peek 操作,准备只查看数据
if (len < 0) len = -len;

// 计算 rcv_queue 中数据的大小
peeksize = ikcp_peeksize(kcp);

// 无法获得大小,返回 -2
if (peeksize < 0)
return -2;

// 数据过大,返回 -3
if (peeksize > len)
return -3;

// nrcv_que: rcv_queue 的长度
// rcv_wnd: 接收窗口的大小
// 如果 nrcv_que >= rcv_wnd,则需要进行快恢复
// 因为 nrcv_que >= rcv_wnd,说明接收窗口已经满了,
// 这个时候需要发送 IKCP_CMD_WINS 告诉发送方窗口大小,
// 这个时候发送方需要进行快恢复,减小数据传输,以尽快释放接收窗口
if (kcp->nrcv_que >= kcp->rcv_wnd)
recover = 1;

// merge fragment
// 将多个片段合并成一个完整的片段
// 合并后,将合并后的片段从 rcv_queue 中删除
for (len = 0, p = kcp->rcv_queue.next; p != &kcp->rcv_queue; ) {
int fragment;
seg = iqueue_entry(p, IKCPSEG, node);
p = p->next;

if (buffer) {
memcpy(buffer, seg->data, seg->len);
buffer += seg->len;
}

len += seg->len;
fragment = seg->frg;

if (ikcp_canlog(kcp, IKCP_LOG_RECV)) {
ikcp_log(kcp, IKCP_LOG_RECV, "recv sn=%lu", (unsigned long)seg->sn);
}

if (ispeek == 0) {
iqueue_del(&seg->node);
ikcp_segment_delete(kcp, seg);
kcp->nrcv_que--;
}

if (fragment == 0)
break;
}

assert(len == peeksize);

// move available data from rcv_buf -> rcv_queue
// 尝试将 rcv_buf 中编号连续的数据,移动到 rcv_queue 中
// 移动后,将移动的数据从 rcv_buf 中删除
while (! iqueue_is_empty(&kcp->rcv_buf)) {
seg = iqueue_entry(kcp->rcv_buf.next, IKCPSEG, node);
if (seg->sn == kcp->rcv_nxt && kcp->nrcv_que < kcp->rcv_wnd) {
iqueue_del(&seg->node);
kcp->nrcv_buf--;
iqueue_add_tail(&seg->node, &kcp->rcv_queue);
kcp->nrcv_que++;
kcp->rcv_nxt++;
} else {
break;
}
}

// 快恢复
if (kcp->nrcv_que < kcp->rcv_wnd && recover) {
// 在ikcp_flush 中返回 IKCP_CMD_WINS
// 通知本段窗口大小给对端
kcp->probe |= IKCP_ASK_TELL;
}

return len;
}

2.4 ikcp_update 定时时钟

前面我们看了 ikcp_sendikcp_inputikcp_recv 三个核心流程的函数,其中的一些细节,你可以回到本文前面的「原理分析」再对照源码仔细阅读。

在前面的原理分析中,我们提到,为了提高传输和处理数据的效率,kcp 设计了队列和缓冲区,同时为了实现可靠性,kcp 也提供了 ACK 和重试、拥塞控制等机制,这些事情都是周期定时去处理的。这里是由 ikcp_update 函数去处理的。

ikcp_update 是 KCP 的定时器函数,负责以固定间隔调用 ikcp_flush 处理数据发送和协议更新,是 KCP 的"心跳"机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//---------------------------------------------------------------------
// update state (call it repeatedly, every 10ms-100ms), or you can ask
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec.
//---------------------------------------------------------------------
void ikcp_update(ikcpcb *kcp, IUINT32 current)
{
IINT32 slap;

kcp->current = current;

if (kcp->updated == 0) {
kcp->updated = 1;
kcp->ts_flush = kcp->current;
}

// 计算间隔
slap = _itimediff(kcp->current, kcp->ts_flush);

if (slap >= 10000 || slap < -10000) {
kcp->ts_flush = kcp->current;
slap = 0;
}

// 达到调用间隔,则执行 ikcp_flush 进行接收数据或发送数据
if (slap >= 0) {
kcp->ts_flush += kcp->interval;
if (_itimediff(kcp->current, kcp->ts_flush) >= 0) {
kcp->ts_flush = kcp->current + kcp->interval;
}
ikcp_flush(kcp);
}
}

这个函数很简单,根据注释所说,通常情况下会每 10ms~100ms 执行一次,然后核心是去调用 ikcp_flush 函数,所有的逻辑都在里面。

2.5 ikcp_flush 定时处理

如上所述,ikcp_flush 是 KCP 的核心发送函数,负责将发送队列 snd_queue 中的数据移入发送缓存 snd_buf 并通过 output 回调发送出去,同时处理 ACK 发送、快速重传、超时重传和窗口探测等协议细节。

ikcp_flush
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
void ikcp_flush(ikcpcb *kcp)
{
IUINT32 current = kcp->current; // 当前时间
char *buffer = kcp->buffer; // 临时缓冲区
char *ptr = buffer;
int count, size, i;
IUINT32 resent, cwnd;
IUINT32 rtomin;
struct IQUEUEHEAD *p;
int change = 0; // 是否执行过快速重传
int lost = 0; // 是否执行过超时重传
IKCPSEG seg;

// 检查是否已调用 ikcp_update
if (kcp->updated == 0) return;

// 初始化一个段用于构建各种控制包
seg.conv = kcp->conv; // 连接标识
seg.cmd = IKCP_CMD_ACK; // 报文类型:IKCP_CMD_ACK 表示确认报文
seg.frg = 0; // 分片数量,表示随后还有多少个报文属于同一个包
seg.wnd = ikcp_wnd_unused(kcp); // 发送方剩余接收窗口的大小
seg.una = kcp->rcv_nxt; // 发送方的接收缓冲区中最小还未收到的报文段的编号,也就是说,编号比它小的报文段都已全部接收
seg.len = 0; // 数据段长度
seg.sn = 0; // 报文编号
seg.ts = 0; // 时间戳

// flush acknowledges
// ① 发送 ACK 队列中的所有 ACK
count = kcp->ackcount;
for (i = 0; i < count; i++) {
size = (int)(ptr - buffer);
// buffer 中累计的数据将要超过 mtu 的时候
// 就调用 ikcp_output 将数据发送出去
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
// 从 ACK 列表中取出 sn(报文编号)和 ts(时间戳)
ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
// 将 ACK 报文写入 buffer
ptr = ikcp_encode_seg(ptr, &seg);
}
// ② ACK 队列已清空
kcp->ackcount = 0;

// probe window size (if remote window size equals zero)
// 对端剩余接收窗口大小为 0,则意味着可能需要发送窗口探测报文:IKCP_CMD_WASK
if (kcp->rmt_wnd == 0) {
// 根据 ts_probe 和 probe_wait 确定当前时刻是否需要发送探测报文
// probe_wait: 等待发送探测报文的时间,IKCP_PROBE_INIT=7s, IKCP_PROBE_LIMIT=
if (kcp->probe_wait == 0) {
kcp->probe_wait = IKCP_PROBE_INIT; // 7s 后去发探测报文
kcp->ts_probe = kcp->current + kcp->probe_wait;
}
else {
if (_itimediff(kcp->current, kcp->ts_probe) >= 0) {
if (kcp->probe_wait < IKCP_PROBE_INIT)
kcp->probe_wait = IKCP_PROBE_INIT;
kcp->probe_wait += kcp->probe_wait / 2;
if (kcp->probe_wait > IKCP_PROBE_LIMIT)
kcp->probe_wait = IKCP_PROBE_LIMIT;
kcp->ts_probe = kcp->current + kcp->probe_wait;
kcp->probe |= IKCP_ASK_SEND; // 设置是否需要去发送 IKCP_ASK_SEND
}
}
} else {
kcp->ts_probe = 0;
kcp->probe_wait = 0;
}

// flush window probing commands
// ③ 如果需要,则发送窗口探测报文:IKCP_CMD_WASK
if (kcp->probe & IKCP_ASK_SEND) {
seg.cmd = IKCP_CMD_WASK;
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ptr = ikcp_encode_seg(ptr, &seg);
}

// flush window probing commands
// ④ 如果需要,则发送窗口通知报文:IKCP_CMD_WINS
if (kcp->probe & IKCP_ASK_TELL) {
seg.cmd = IKCP_CMD_WINS;
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ptr = ikcp_encode_seg(ptr, &seg);
}

kcp->probe = 0;

// calculate window size
// ⑤ 计算当前窗口大小
cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);

// move data from snd_queue to snd_buf
// 5.1 如果符合发送的条件,则创建新的 newseg 并放入 snd_buf 的尾部
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
IKCPSEG *newseg;
if (iqueue_is_empty(&kcp->snd_queue)) break;

newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);

iqueue_del(&newseg->node);
iqueue_add_tail(&newseg->node, &kcp->snd_buf);
kcp->nsnd_que--;
kcp->nsnd_buf++;

newseg->conv = kcp->conv;
newseg->cmd = IKCP_CMD_PUSH;
newseg->wnd = seg.wnd;
newseg->ts = current;
newseg->sn = kcp->snd_nxt++;
newseg->una = kcp->rcv_nxt;
newseg->resendts = current;
newseg->rto = kcp->rx_rto;
newseg->fastack = 0;
newseg->xmit = 0;
}

// calculate resent
// 失序多少次就快速重传。如果 fastresend 大于 0,则取其值;否则,设为最大值 0xffffffff。
resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
// 最小超时重传时间。如果 nodelay 为 0,则为 rx_rto 的八分之一,否则为 0。
rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0;

// flush data segments
for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
// 从 snd_buf 取出一个报文
IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
int needsend = 0;
// 条件1:第一次发送的报文,直接发送
if (segment->xmit == 0) { // 该报文的 xmit 传输次数
needsend = 1;
segment->xmit++;
segment->rto = kcp->rx_rto;
segment->resendts = current + segment->rto + rtomin;
}
else if (_itimediff(current, segment->resendts) >= 0) {
// 条件2:且重传时间到了,则重传
needsend = 1;
segment->xmit++;
kcp->xmit++;
if (kcp->nodelay == 0) {
segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);
} else {
IINT32 step = (kcp->nodelay < 2)?
((IINT32)(segment->rto)) : kcp->rx_rto;
segment->rto += step / 2;
}
segment->resendts = current + segment->rto;
lost = 1;
}
else if (segment->fastack >= resent) {
// 条件3:达到快速重传次数,则重传
if ((int)segment->xmit <= kcp->fastlimit ||
kcp->fastlimit <= 0) {
needsend = 1;
segment->xmit++;
segment->fastack = 0;
segment->resendts = current + segment->rto;
change++;
}
}

if (needsend) {
int need;
segment->ts = current;
segment->wnd = seg.wnd;
segment->una = kcp->rcv_nxt;

size = (int)(ptr - buffer);
need = IKCP_OVERHEAD + segment->len;

if (size + need > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}

ptr = ikcp_encode_seg(ptr, segment);

if (segment->len > 0) {
memcpy(ptr, segment->data, segment->len);
ptr += segment->len;
}

// 如果某个数据包的重传次数超过阈值,则标记连接断开。
if (segment->xmit >= kcp->dead_link) {
kcp->state = (IUINT32)-1;
}
}
}

// flash remain segments
size = (int)(ptr - buffer);
if (size > 0) {
ikcp_output(kcp, buffer, size);
}

// update ssthresh
// 1. 如果发生了快速重传,让 ssthresh 减半,进入快恢复
if (change) {
IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
kcp->ssthresh = inflight / 2;
if (kcp->ssthresh < IKCP_THRESH_MIN)
kcp->ssthresh = IKCP_THRESH_MIN;
kcp->cwnd = kcp->ssthresh + resent;
kcp->incr = kcp->cwnd * kcp->mss;
}

// 2. 如果发生了超时重传,则让 ssthresh 减半,然后 cwnd = 1,进入慢启动
if (lost) {
kcp->ssthresh = cwnd / 2;
if (kcp->ssthresh < IKCP_THRESH_MIN)
kcp->ssthresh = IKCP_THRESH_MIN;
kcp->cwnd = 1;
kcp->incr = kcp->mss;
}

// 兜底,cwnd 至少为 1
if (kcp->cwnd < 1) {
kcp->cwnd = 1;
kcp->incr = kcp->mss;
}
}

参考


KCP 源码分析与原理总结
https://hedon.top/2024/06/12/kcp/
Author
Hedon Wang
Posted on
2024-06-12
Licensed under