Rust 实战丨SSE(Server-Sent Events)
📌 SSE(Server-Sent
Events)是一种允许服务器向客户端浏览器推送信息的技术。它是 HTML5
的一部分,专门用于建立一个单向的从服务器到客户端的通信连接。SSE的使用场景非常广泛,包括实时消息推送、实时通知更新等。
SSE 的本质
严格地说,HTTP 无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。
也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。
SSE 就是利用这种机制,使用流信息向浏览器推送信息。它基于 HTTP
协议,目前除了 IE/Edge,其他浏览器都支持。
特点
持续连接 :与传统的 HTTP 请求不同,SSE
保持连接开放,服务器可以随时发送消息。
文本数据流 :SSE
主要传输文本数据,这些数据以特定的格式流式传输,使得每条消息都是简单的文本格式。
内置重连机制 :浏览器会自动处理连接中断和重连,包括在重连请求中发送最后接收的事件
ID,以便服务器从正确的位置恢复发送事件。
简单的客户端处理 :在浏览器中,使用 JavaScript 的
EventSource
接口处理 SSE
非常简单,只需几行代码即可监听服务器发来的事件。
工作原理
建立连接 :客户端通过创建一个
EventSource
对象请求特定的 URL 来启动 SSE
连接。这个请求是一个标准的 HTTP
请求,但会要求服务器以特定方式响应。
服务器响应 :服务器响应必须设置
Content-Type
为
text/event-stream
,然后保持连接打开。
发送消息 :服务器可以通过持续发送数据格式为特定事件流的消息来推送更新。每个消息包括一个可选的事件类型、数据和一个可选的
ID。
数据 :实际的消息内容,以 data:
开头,多行数据以双换行符 \n\n
结束。
事件类型 :允许客户端根据事件类型来监听,以
event:
开头。
ID :如果连接中断,客户端将发送包含上次接收的最后一个ID的
Last-Event-ID
头,以便服务器从断点继续发送数据。
实战
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 <!DOCTYPE html > <html > <head > <title > SSE Test</title > </head > <body > <h1 > Server-Sent Events Test</h1 > <div id ="events" > </div > <script > var eventSource = new EventSource ('http://localhost:8000/events' ); eventSource.onmessage = function (event ) { console .log ('New event:' , event.data ); document .getElementById ('events' ).innerHTML += event.data + '<br>' ; }; </script > </body > </html >
Rust 服务端
Rust 实现演示
依赖:
1 2 3 4 5 6 7 anyhow = "1.0.86" axum = { version = "0.7.5" }chrono = "0.4.38" futures-core = "0.3.30" tokio = { version = "1.38.0" , features = ["macros" , "rt-multi-thread" , ] }tokio-stream = "0.1.15" tower-http = { version = "0.5.2" , features = ["cors" ] }
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 use std::time::Duration;use axum::{ response::{sse::Event, Sse}, routing::get, Router, }; use tokio::{net::TcpListener, time::interval};use tokio_stream::{wrappers::IntervalStream, StreamExt};use tower_http::cors::{Any, CorsLayer};#[tokio::main] async fn main () -> anyhow::Result <()> { let cors = CorsLayer::new () .allow_headers (Any) .allow_origin (Any) .allow_headers (Any) .allow_credentials (false ); let listener = TcpListener::bind ("0.0.0.0:8000" ).await ?; let app = Router::new ().route ("/events" , get (sse_handler)).layer (cors); axum::serve (listener, app).await ?; Ok (()) } async fn sse_handler () -> Sse<impl futures_core ::Stream<Item = Result <Event, axum::Error>>> { let interval = interval (Duration::from_secs (1 )); let stream = IntervalStream::new (interval).map (|_| { let data = format! ("{}\n\n" , chrono::Local::now ().to_rfc2822 ()); Ok (Event::default ().data (data)) }); Sse::new (stream) }
Go 服务端
Go 实现演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package mainimport ( "fmt" "log" "net/http" "time" ) func sseHandler (w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type" , "text/event-stream" ) w.Header().Set("Cache-Control" , "no-cache" ) w.Header().Set("Connection" , "keep-alive" ) w.Header().Set("Access-Control-Allow-Origin" , "*" ) for { now := time.Now() msg := fmt.Sprintf("data: %s\n\n" , now.Format(time.DateTime)) if _, err := fmt.Fprintf(w, msg); err != nil { log.Println("write error:" , err) break } flusher, ok := w.(http.Flusher) if !ok { log.Println("Streaming unsupported!" ) break } flusher.Flush() time.Sleep(1 * time.Second) } } func main () { http.HandleFunc("/events" , sseHandler) log.Println("Server started on port 8000..." ) log.Fatal(http.ListenAndServe(":8000" , nil )) }