【SpringBoot WEB系列】SSE 服務器發送事件詳解
SSE 全稱Server Sent Event,直譯一下就是服務器發送事件,一般的項目開發中,用到的機會不多,可能很多小夥伴不太清楚這個東西,到底是幹啥的,有啥用
本文主要知識點如下:
- SSE 掃盲,應用場景分析
- 藉助異步請求實現 sse 功能,加深概念理解
- 使用SseEmitter實現一個簡單的推送示例
I. SSE 掃盲
對於 sse 基礎概念比較清楚的可以跳過本節
1. 概念介紹
sse(Server Sent Event),直譯為服務器發送事件,顧名思義,也就是客戶端可以獲取到服務器發送的事件
我們常見的 http 交互方式是客戶端發起請求,服務端響應,然後一次請求完畢;但是在 sse 的場景下,客戶端發起請求,連接一直保持,服務端有數據就可以返回數據給客戶端,這個返回可以是多次間隔的方式
2. 特點分析
SSE 最大的特點,可以簡單規劃為兩個
- 長連接
- 服務端可以向客戶端推送信息
瞭解 websocket 的小夥伴,可能也知道它也是長連接,可以推送信息,但是它們有一個明顯的區別
sse 是單通道,只能服務端向客戶端發消息;而 webscoket 是雙通道
那麼為什麼有了 webscoket 還要搞出一個 sse 呢?既然存在,必然有著它的優越之處
sse websocket http 協議 獨立的 websocket 協議 輕量,使用簡單 相對複雜 默認支持斷線重連 需要自己實現斷線重連 文本傳輸 二進制傳輸 支持自定義發送的消息類型 -
3. 應用場景
從 sse 的特點出發,我們可以大致的判斷出它的應用場景,需要輪詢獲取服務端最新數據的 case 下,多半是可以用它的
比如顯示當前網站在線的實時人數,法幣匯率顯示當前實時匯率,電商大促的實時成交額等等...
II. 手動實現 sse 功能
sse 本身是有自己的一套玩法的,後面會進行說明,這一小節,則主要針對 sse 的兩個特點長連接 + 後端推送數據,如果讓我們自己來實現這樣的一個接口,可以怎麼做?
1. 項目創建
藉助 SpringBoot 2.2.1.RELEASE來創建一個用於演示的工程項目,核心的 xml 依賴如下
<code><parent>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-parent/<artifactid>
<version>2.2.1.RELEASE/<version>
<relativepath>
/<parent>
<properties>
<project.build.sourceencoding>UTF-8/<project.build.sourceencoding>
<project.reporting.outputencoding>UTF-8/<project.reporting.outputencoding>
<java.version>1.8/<java.version>
/<properties>
<dependencies>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-web/<artifactid>
/<dependency>
/<dependencies>
<build>
<pluginmanagement>
<plugins>
<plugin>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-maven-plugin/<artifactid>
/<plugin>
/<plugins>
/<pluginmanagement>
/<build>
<repositories>
<repository>
spring-snapshots
<name>Spring Snapshots/<name>
https://repo.spring.io/libs-snapshot-local
<snapshots>
<enabled>true/<enabled>
/<snapshots>
/<repository>
<repository>
spring-milestones
<name>Spring Milestones/<name>
https://repo.spring.io/libs-milestone-local
<snapshots>
<enabled>false/<enabled>
/<snapshots>
/<repository>
<repository>
spring-releases
<name>Spring Releases/<name>
https://repo.spring.io/libs-release-local
<snapshots>
<enabled>false/<enabled>
/<snapshots>
/<repository>
/<repositories>/<code>
2. 功能實現
在 Http1.1 支持了長連接,請求頭添加一個Connection: keep-alive即可
在這裡我們藉助異步請求來實現 sse 功能,至於什麼是異步請求,推薦查看博文: 【WEB 系列】異步請求知識點與使用姿勢小結
因為後端可以不定時返回數據,所以我們需要注意的就是需要保持連接,不要返回一次數據之後就斷開了;其次就是需要設置請求頭Content-Type: text/event-stream;charset=UTF-8 (如果不是流的話會怎樣?)
<code>// 新建一個容器,保存連接,用於輸出返回
private Map<string> responseMap = new ConcurrentHashMap<>();
// 發送數據給客戶端
private void writeData(String id, String msg, boolean over) throws IOException {
PrintWriter writer = responseMap.get(id);
if (writer == null) {
return;
}
writer.println(msg);
writer.flush();
if (over) {
responseMap.remove(id);
}
}
// 推送
@ResponseBody
@GetMapping(path = "subscribe")
public WebAsyncTask<void> subscribe(String id, HttpServletResponse response) {
Callable<void> callable = () -> {
response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
responseMap.put(id, response.getWriter());
writeData(id, "訂閱成功", false);
while (true) {
Thread.sleep(1000);
if (!responseMap.containsKey(id)) {
break;
}
}
return null;
};
// 採用WebAsyncTask 返回 這樣可以處理超時和錯誤 同時也可以指定使用的Excutor名稱
WebAsyncTask<void> webAsyncTask = new WebAsyncTask<>(30000, callable);
// 注意:onCompletion表示完成,不管你是否超時、是否拋出異常,這個函數都會執行的
webAsyncTask.onCompletion(() -> System.out.println("程序[正常執行]完成的回調"));
// 這兩個返回的內容,最終都會放進response裡面去===========
webAsyncTask.onTimeout(() -> {
responseMap.remove(id);
System.out.println("超時了!!!");
return null;
});
// 備註:這個是Spring5新增的
webAsyncTask.onError(() -> {
System.out.println("出現異常!!!");
return null;
});
return webAsyncTask;
}/<void>/<void>/<void>/<string>/<code>
看一下上面的實現,基本上還是異步請求的那一套邏輯,請仔細看一下callable中的邏輯,有一個 while 循環,來保證長連接不中斷
接下來我們新增兩個接口,用來模擬後端給客戶端發送消息,關閉連接的場景
<code>@ResponseBody
@GetMapping(path = "push")
public String pushData(String id, String content) throws IOException {
writeData(id, content, false);
return "over!";
}
@ResponseBody
@GetMapping(path = "over")
public String over(String id) throws IOException {
writeData(id, "over", true);
return "over!";
}/<code>
我們簡單的來演示下操作過程
III. SseEmitter
上面只是簡單實現了 sse 的長連接 + 後端推送消息,但是與標準的 SSE 還是有區別的,sse 有自己的規範,而我們上面的實現,實際上並沒有管這個,導致的問題是前端按照 sse 的玩法來請求數據,可能並不能正常工作
1. sse 規範
在 html5 的定義中,服務端 sse,一般需要遵循以下要求
請求頭
開啟長連接 + 流方式傳遞
<code>Content-Type: text/event-stream;charset=UTF-8
Cache-Control: no-cache
Connection: keep-alive/<code>
數據格式
服務端發送的消息,由 message 組成,其格式如下:
<code>field:value\\n\\n/<code>
其中 field 有五種可能
- 空: 即以:開頭,表示註釋,可以理解為服務端向客戶端發送的心跳,確保連接不中斷
- data:數據
- event: 事件,默認值
- id: 數據標識符用 id 字段表示,相當於每一條數據的編號
- retry: 重連時間
2. 實現
SpringBoot 利用 SseEmitter 來支持 sse,可以說非常簡單了,直接返回SseEmitter對象即可;重寫一下上面的邏輯
<code>@RestController
@RequestMapping(path = "sse")
public class SseRest {
private static Map<string> sseCache = new ConcurrentHashMap<>();
@GetMapping(path = "subscribe")
public SseEmitter push(String id) {
// 超時時間設置為1小時
SseEmitter sseEmitter = new SseEmitter(3600_000L);
sseCache.put(id, sseEmitter);
sseEmitter.onTimeout(() -> sseCache.remove(id));
sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
return sseEmitter;
}
@GetMapping(path = "push")
public String push(String id, String content) throws IOException {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
sseEmitter.send(content);
}
return "over";
}
@GetMapping(path = "over")
public String over(String id) {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
sseEmitter.complete();
sseCache.remove(id);
}
return "over";
}
}/<string>/<code>
上面的實現,用到了 SseEmitter 的幾個方法,解釋如下
- send(): 發送數據,如果傳入的是一個非SseEventBuilder對象,那麼傳遞參數會被封裝到 data 中
- complete(): 表示執行完畢,會斷開連接
- onTimeout(): 超時回調觸發
- onCompletion(): 結束之後的回調觸發
同樣演示一下訪問請求
上圖總的效果和前面的效果差不多,而且輸出還待上了前綴,接下來我們寫一個簡單的 html 消費端,用來演示一下完整的 sse 的更多特性
<code>
<title>Sse測試文檔/<title>sse測試
/<code>
將上面的 html 文件放在項目的resources/static目錄下;然後修改一下前面的SseRest
<code>@Controller
@RequestMapping(path = "sse")
public class SseRest {
@GetMapping(path = "")
public String index() {
return "index.html";
}
@ResponseBody
@GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter push(String id) {
// 超時時間設置為3s,用於演示客戶端自動重連
SseEmitter sseEmitter = new SseEmitter(1_000L);
// 設置前端的重試時間為1s
sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("連接成功"));
sseCache.put(id, sseEmitter);
System.out.println("add " + id);
sseEmitter.onTimeout(() -> {
System.out.println(id + "超時");
sseCache.remove(id);
});
sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
return sseEmitter;
}
}/<code>
我們上面超時時間設置的比較短,用來測試下客戶端的自動重連,如下,開啟的日誌不斷增加
其次將 SseEmitter 的超時時間設長一點,再試一下數據推送功能
請注意上面的演示,當後端結束了長連接之後,客戶端會自動重新再次連接,不用寫額外的重試邏輯了,就這麼神奇
3. 小結
本篇文章介紹了 SSE 的相關知識點,並對比 websocket 給出了 sse 的優點(至於啥優點請往上翻)
請注意,本文雖然介紹了兩種 sse 的方式,第一種藉助異步請求來實現,如果需要完成 sse 的規範要求,需要自己做一些適配,如果需要了解 sse 底層實現原理的話,可以參考一下;在實際的業務開發中,推薦使用SseEmitter
源碼
- 工程:https://github.com/liuyueyi/spring-boot-demo[1]
- 項目源碼: https://github.com/liuyueyi/spring-boot-demo/blob/master/spring-boot/220-web-sse[2]
閱讀更多 小灰灰blog 的文章