【SpringBoot WEB系列】SSE 服務器發送事件詳解

【SpringBoot WEB系列】SSE 服務器發送事件詳解

【SpringBoot WEB系列】SSE 服務器發送事件詳解

SSE 全稱Server Sent Event,直譯一下就是服務器發送事件,一般的項目開發中,用到的機會不多,可能很多小夥伴不太清楚這個東西,到底是幹啥的,有啥用

本文主要知識點如下:

  • SSE 掃盲,應用場景分析
  • 藉助異步請求實現 sse 功能,加深概念理解
  • 使用SseEmitter實現一個簡單的推送示例

I. SSE 掃盲

對於 sse 基礎概念比較清楚的可以跳過本節

1. 概念介紹

sse(Server Sent Event),直譯為服務器發送事件,顧名思義,也就是客戶端可以獲取到服務器發送的事件

我們常見的 http 交互方式是客戶端發起請求,服務端響應,然後一次請求完畢;但是在 sse 的場景下,客戶端發起請求,連接一直保持,服務端有數據就可以返回數據給客戶端,這個返回可以是多次間隔的方式

2. 特點分析

SSE 最大的特點,可以簡單規劃為兩個

  • 長連接
  • 服務端可以向客戶端推送信息

瞭解 websocket 的小夥伴,可能也知道它也是長連接,可以推送信息,但是它們有一個明顯的區別

sse 是單通道,只能服務端向客戶端發消息;而 webscoket 是雙通道

那麼為什麼有了 webscoket 還要搞出一個 sse 呢?既然存在,必然有著它的優越之處

sse websocket http 協議 獨立的 websocket 協議 輕量,使用簡單 相對複雜 默認支持斷線重連 需要自己實現斷線重連 文本傳輸 二進制傳輸 支持自定義發送的消息類型 -

3. 應用場景

從 sse 的特點出發,我們可以大致的判斷出它的應用場景,需要輪詢獲取服務端最新數據的 case 下,多半是可以用它的

比如顯示當前網站在線的實時人數,法幣匯率顯示當前實時匯率,電商大促的實時成交額等等...

II. 手動實現 sse 功能

sse 本身是有自己的一套玩法的,後面會進行說明,這一小節,則主要針對 sse 的兩個特點長連接 + 後端推送數據,如果讓我們自己來實現這樣的一個接口,可以怎麼做?

1. 項目創建

藉助 SpringBoot 2.2.1.RELEASE來創建一個用於演示的工程項目,核心的 xml 依賴如下

<code><parent>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-parent/<artifactid>
<version>2.2.1.RELEASE/<version>
<relativepath>
/<parent>

<properties>
<project.build.sourceencoding>UTF-8/<project.build.sourceencoding>
<project.reporting.outputencoding>UTF-8/<project.reporting.outputencoding>
<java.version>1.8/<java.version>
/<properties>

<dependencies>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-web/<artifactid>
/<dependency>
/<dependencies>

<build>
<pluginmanagement>

<plugins>
<plugin>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-maven-plugin/<artifactid>
/<plugin>
/<plugins>
/<pluginmanagement>
/<build>
<repositories>
<repository>
spring-snapshots
<name>Spring Snapshots/<name>
https://repo.spring.io/libs-snapshot-local
<snapshots>
<enabled>true/<enabled>
/<snapshots>
/<repository>
<repository>
spring-milestones
<name>Spring Milestones/<name>
https://repo.spring.io/libs-milestone-local
<snapshots>
<enabled>false/<enabled>
/<snapshots>
/<repository>
<repository>
spring-releases
<name>Spring Releases/<name>
https://repo.spring.io/libs-release-local
<snapshots>
<enabled>false/<enabled>
/<snapshots>
/<repository>
/<repositories>/<code>

2. 功能實現

在 Http1.1 支持了長連接,請求頭添加一個Connection: keep-alive即可

在這裡我們藉助異步請求來實現 sse 功能,至於什麼是異步請求,推薦查看博文: 【WEB 系列】異步請求知識點與使用姿勢小結

因為後端可以不定時返回數據,所以我們需要注意的就是需要保持連接,不要返回一次數據之後就斷開了;其次就是需要設置請求頭Content-Type: text/event-stream;charset=UTF-8 (如果不是流的話會怎樣?)

<code>// 新建一個容器,保存連接,用於輸出返回
private Map<string> responseMap = new ConcurrentHashMap<>();

// 發送數據給客戶端
private void writeData(String id, String msg, boolean over) throws IOException {
PrintWriter writer = responseMap.get(id);
if (writer == null) {
return;
}

writer.println(msg);
writer.flush();
if (over) {
responseMap.remove(id);
}
}

// 推送
@ResponseBody
@GetMapping(path = "subscribe")
public WebAsyncTask<void> subscribe(String id, HttpServletResponse response) {

Callable<void> callable = () -> {
response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
responseMap.put(id, response.getWriter());
writeData(id, "訂閱成功", false);
while (true) {
Thread.sleep(1000);

if (!responseMap.containsKey(id)) {
break;
}
}
return null;
};

// 採用WebAsyncTask 返回 這樣可以處理超時和錯誤 同時也可以指定使用的Excutor名稱
WebAsyncTask<void> webAsyncTask = new WebAsyncTask<>(30000, callable);
// 注意:onCompletion表示完成,不管你是否超時、是否拋出異常,這個函數都會執行的
webAsyncTask.onCompletion(() -> System.out.println("程序[正常執行]完成的回調"));

// 這兩個返回的內容,最終都會放進response裡面去===========
webAsyncTask.onTimeout(() -> {
responseMap.remove(id);
System.out.println("超時了!!!");
return null;
});
// 備註:這個是Spring5新增的
webAsyncTask.onError(() -> {
System.out.println("出現異常!!!");
return null;
});


return webAsyncTask;
}/<void>/<void>/<void>/<string>/<code>

看一下上面的實現,基本上還是異步請求的那一套邏輯,請仔細看一下callable中的邏輯,有一個 while 循環,來保證長連接不中斷

接下來我們新增兩個接口,用來模擬後端給客戶端發送消息,關閉連接的場景

<code>@ResponseBody
@GetMapping(path = "push")
public String pushData(String id, String content) throws IOException {
writeData(id, content, false);
return "over!";
}

@ResponseBody
@GetMapping(path = "over")
public String over(String id) throws IOException {
writeData(id, "over", true);
return "over!";
}/<code>

我們簡單的來演示下操作過程

【SpringBoot WEB系列】SSE 服務器發送事件詳解

III. SseEmitter

上面只是簡單實現了 sse 的長連接 + 後端推送消息,但是與標準的 SSE 還是有區別的,sse 有自己的規範,而我們上面的實現,實際上並沒有管這個,導致的問題是前端按照 sse 的玩法來請求數據,可能並不能正常工作

1. sse 規範

在 html5 的定義中,服務端 sse,一般需要遵循以下要求

請求頭

開啟長連接 + 流方式傳遞

<code>Content-Type: text/event-stream;charset=UTF-8
Cache-Control: no-cache
Connection: keep-alive/<code>

數據格式

服務端發送的消息,由 message 組成,其格式如下:

<code>field:value\\n\\n/<code>

其中 field 有五種可能

  • 空: 即以:開頭,表示註釋,可以理解為服務端向客戶端發送的心跳,確保連接不中斷
  • data:數據
  • event: 事件,默認值
  • id: 數據標識符用 id 字段表示,相當於每一條數據的編號
  • retry: 重連時間

2. 實現

SpringBoot 利用 SseEmitter 來支持 sse,可以說非常簡單了,直接返回SseEmitter對象即可;重寫一下上面的邏輯

<code>@RestController
@RequestMapping(path = "sse")
public class SseRest {
private static Map<string> sseCache = new ConcurrentHashMap<>();
@GetMapping(path = "subscribe")
public SseEmitter push(String id) {
// 超時時間設置為1小時
SseEmitter sseEmitter = new SseEmitter(3600_000L);
sseCache.put(id, sseEmitter);
sseEmitter.onTimeout(() -> sseCache.remove(id));
sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
return sseEmitter;
}

@GetMapping(path = "push")
public String push(String id, String content) throws IOException {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
sseEmitter.send(content);
}
return "over";
}

@GetMapping(path = "over")
public String over(String id) {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
sseEmitter.complete();

sseCache.remove(id);
}
return "over";
}
}/<string>/<code>

上面的實現,用到了 SseEmitter 的幾個方法,解釋如下

  • send(): 發送數據,如果傳入的是一個非SseEventBuilder對象,那麼傳遞參數會被封裝到 data 中
  • complete(): 表示執行完畢,會斷開連接
  • onTimeout(): 超時回調觸發
  • onCompletion(): 結束之後的回調觸發

同樣演示一下訪問請求

【SpringBoot WEB系列】SSE 服務器發送事件詳解

上圖總的效果和前面的效果差不多,而且輸出還待上了前綴,接下來我們寫一個簡單的 html 消費端,用來演示一下完整的 sse 的更多特性

<code>


<title>Sse測試文檔/<title>


sse測試




/<code>

將上面的 html 文件放在項目的resources/static目錄下;然後修改一下前面的SseRest

<code>@Controller
@RequestMapping(path = "sse")
public class SseRest {
@GetMapping(path = "")
public String index() {
return "index.html";
}

@ResponseBody
@GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter push(String id) {
// 超時時間設置為3s,用於演示客戶端自動重連
SseEmitter sseEmitter = new SseEmitter(1_000L);
// 設置前端的重試時間為1s
sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("連接成功"));
sseCache.put(id, sseEmitter);
System.out.println("add " + id);
sseEmitter.onTimeout(() -> {

System.out.println(id + "超時");
sseCache.remove(id);
});
sseEmitter.onCompletion(() -> System.out.println("完成!!!"));
return sseEmitter;
}
}/<code>

我們上面超時時間設置的比較短,用來測試下客戶端的自動重連,如下,開啟的日誌不斷增加

【SpringBoot WEB系列】SSE 服務器發送事件詳解

其次將 SseEmitter 的超時時間設長一點,再試一下數據推送功能

【SpringBoot WEB系列】SSE 服務器發送事件詳解

請注意上面的演示,當後端結束了長連接之後,客戶端會自動重新再次連接,不用寫額外的重試邏輯了,就這麼神奇

3. 小結

本篇文章介紹了 SSE 的相關知識點,並對比 websocket 給出了 sse 的優點(至於啥優點請往上翻)

請注意,本文雖然介紹了兩種 sse 的方式,第一種藉助異步請求來實現,如果需要完成 sse 的規範要求,需要自己做一些適配,如果需要了解 sse 底層實現原理的話,可以參考一下;在實際的業務開發中,推薦使用SseEmitter


源碼


  • 工程:https://github.com/liuyueyi/spring-boot-demo[1]
  • 項目源碼: https://github.com/liuyueyi/spring-boot-demo/blob/master/spring-boot/220-web-sse[2]


分享到:


相關文章: