使用 Spring 5 的 Webflux 開發 Reactive 應用

使用 Spring 5 的 Webflux 開發 Reactive 應用


對 Java 開發者來說,2017 年 9 月是個熱鬧的月份,Java SE 9、Java EE 8 相繼發佈,而 Spring 也在這段時間,發佈了 5.0 正式版本。

身為 Java 開發者,對於 Spring 框架並不陌生。Spring 在 Rod Johnson 十幾年前一個人單挑 J2EE 體系開始,到十年前開始大行其道至今,基本上已是 Java 開發領域的事實標準了。

Spring 5 中最重要改動是把反應式編程的思想應用到了框架的各個方面,Spring 5 的反應式編程以 Reactor 庫為基礎。Spring 5 框架所包含的內容很多,這裡只重點介紹其中新增的 WebFlux 模塊。我們可以使用 WebFlux 創建高性能的 Web 應用和客戶端。本文對 WebFlux 模塊進行了詳細介紹,包括其中的 HTTP、服務器推送事件(SSE)和 WebSocket 支持。

WebFlux 簡介

WebFlux 模塊的名稱是 spring-webflux,名稱中的 Flux 來源於 Reactor 中的類 Flux。該模塊中包含了對反應式 HTTP、服務器推送事件和 WebSocket 的客戶端和服務器端的支持。

對於開發人員來說,比較重要的是服務器端的開發,這也是本文的重點。在服務器端,WebFlux 支持兩種不同的編程模型:第一種是 Spring MVC 中使用的基於 Java 註解的方式;第二種是基於 Java 8 的 Lambda 表達式的函數式編程模型。這兩種編程模型只是在代碼編寫方式上存在不同。它們運行在同樣的反應式底層架構之上,因此在運行時是相同的。WebFlux 需要底層提供運行時的支持,WebFlux 可以運行在支持 Servlet 3.1 非阻塞 IO API 的 Servlet 容器上,或是其他異步運行時環境,如 Netty 和 Undertow。

使用 Spring 5 的 Webflux 開發 Reactive 應用


Spring Boot 2 是基於 Spring 5 的,其中一個比較大的更新就在於支持包括 spring-webflux 和響應式的 spring-data 在內的響應式模塊,下邊的例子我們就用 Spring Boot 2 在進行搭建。

本文從三個方面對 WebFlux 進行介紹。首先是使用經典的基於 Java 註解的編程模型來進行開發,其次是使用 WebFlux 新增的函數式編程模型來進行開發,最後介紹 WebFlux 應用的測試。通過這樣循序漸進的方式讓讀者瞭解 WebFlux 應用開發的細節。

Java 註解編程模型

基於 Java 註解的編程模型,對於使用過 Spring MVC 的人來說是再熟悉不過的。在 WebFlux 應用中使用同樣的模式,容易理解和上手。我們先從最經典的 Hello World 開始說明。

我們通過 Spring Initializ 創建一個 Spring Boot 工程,因為目前我們還不涉及 DAO 層,所以只選擇 Reactive Web 就行了

使用 Spring 5 的 Webflux 開發 Reactive 應用


也可以使用網頁版的 https://start.spring.io 來創建項目:

使用 Spring 5 的 Webflux 開發 Reactive 應用


創建後的項目 POM 中,包含下邊的依賴,即表示基於 Spring WebFlux:

使用 Spring 5 的 Webflux 開發 Reactive 應用


創建 Controller 類HelloController,僅提供一個 Endpoint/hello:

使用 Spring 5 的 Webflux 開發 Reactive 應用


我們啟動應用,通過訪問 http://localhost:8080/hello 可以得到返回值Hello World!。

通過這個簡單的 Hello World 示例我們可以看到使用 WebFlux 與 Spring MVC 的不同在於,WebFlux 所使用的類型是與反應式編程相關的 Flux 和 Mono 等,而不是簡單的對象。對於簡單的 Hello World 示例來說,這兩者之間並沒有什麼太大的差別。對於複雜的應用來說,反應式編程和負壓的優勢會體現出來,可以帶來整體的性能的提升。

RESTful API

簡單的 Hello World 示例並不足以說明 WebFlux 的用法。在下面的小節中,本文將介紹其他具體的實例。先從 RESTful API 開始說起。RESTful API 在 Web 服務器端應用中佔據了很大的一部分。我們通過一個具體的實例來說明如何使用 WebFlux 來開發 RESTful API,該 RESTful API 用來對用戶數據進行基本的 CRUD 操作。

作為領域對象的User類中包含了id、name和email三個基本的屬性。

使用 Spring 5 的 Webflux 開發 Reactive 應用


為了對User類進行操作,我們需要提供服務類UserService。UserService使用一個 Map 來保存所有用戶的信息,並不是一個持久化的實現,這對於示例應用來說已經足夠了。

使用 Spring 5 的 Webflux 開發 Reactive 應用


UserController是具體的 Spring MVC 控制器類,它使用UserService來完成具體的功能。UserController中使用了註解@ExceptionHandler來添加了ResourceNotFoundException異常的處理方法,並返回 404 錯誤。UserController中的方法都很簡單,只是簡單地代理給UserService中的對應方法。

使用 Spring 5 的 Webflux 開發 Reactive 應用


我們可以通過訪問 http://127.0.0.1:8080/user 獲取到用戶信息,其他的 CRUD 操作可以自行測試

使用 Spring 5 的 Webflux 開發 Reactive 應用


服務器推送事件(SSE)

服務器推送事件(Server-Sent Events,SSE)允許服務器端不斷地推送數據到客戶端。相對於 WebSocket 而言,服務器推送事件只支持服務器端到客戶端的單向數據傳遞。雖然功能較弱,但優勢在於 SSE 在已有的 HTTP 協議上使用簡單易懂的文本格式來表示傳輸的數據。作為 W3C 的推薦規範,SSE 在瀏覽器端的支持也比較廣泛,除了 IE 之外的其他瀏覽器都提供了支持。在 IE 上也可以使用 polyfill 庫來提供支持。在服務器端來說,SSE 是一個不斷產生新數據的流,非常適合於用反應式流來表示。在 WebFlux 中創建 SSE 的服務器端是非常簡單的。只需要返回的對象的類型是 Flux,就會被自動按照 SSE 規範要求的格式來發送響應。

下面的SseController是一個使用 SSE 的控制器的示例。

使用 Spring 5 的 Webflux 開發 Reactive 應用


  1. 每 1 秒發出一個自增的 Long 值
  2. 使用ServerSentEvent.Builder來創建ServerSentEvent對象

在測試 SSE 時,我們只需要使用 curl 來訪問即可。

使用 Spring 5 的 Webflux 開發 Reactive 應用


WebSocket

WebSocket 支持客戶端與服務器端的雙向通訊。當客戶端與服務器端之間的交互方式比較複雜時,可以使用 WebSocket。WebSocket 在主流的瀏覽器上都得到了支持。

WebFlux 也對創建 WebSocket 服務器端提供了支持。在服務器端,我們需要實現接口org.springframework.web.reactive.socket.WebSocketHandler來處理 WebSocket 通訊,其handle方法的參數是WebSocketSession對象,可以用來獲取客戶端信息、接送消息和發送消息。

下面的EchoHandler對於每個接收到的消息,都會在其前邊添加一個前綴Echo ->再發送出去。WebSocketSession的receive()方法的返回值是一個Flux<websocketmessage>對象,表示的是接收到的消息流,而send()方法的參數是一個Publisher<websocketmessage>對象,表示要發送的消息流。/<websocketmessage>/<websocketmessage>

使用 Spring 5 的 Webflux 開發 Reactive 應用


僅創建一個WebSocketHandler是不夠的,我們還需要把它註冊到 WebFlux 中。

我們再來需要創建一個WebSocketHandlerAdapter對象,該對象負責把 WebSocketHandler 關聯到 WebFlux 中。其中我們使用HandlerMapping把EchoHandler映射到/echo端點。

使用 Spring 5 的 Webflux 開發 Reactive 應用


運行應用之後,可以使用工具來測試該 WebSocket 服務。

打開工具頁面,然後連接到ws://localhost:8080/echo可以發送消息並查看服務器端返回的結果。

使用 Spring 5 的 Webflux 開發 Reactive 應用


函數式編程模型

前面介紹了基於 Java 註解的編程模型,WebFlux 還支持基於 Lambda 表達式的函數式編程模型。與基於 Java 註解的編程模型相比,函數式編程模型的抽象層次更低,代碼編寫更靈活,可以滿足一些對動態性要求更高的場景。不過在編寫時的代碼複雜度也較高,學習曲線也較陡。

目前 Spring Boot 已支持在一個應用中同時使用兩種不同的編程模式,我們可以根據實際的需要來選擇合適的編程模型。

在函數式編程模型中,每個請求是由一個函數來處理的, 通過接口org.springframework.web.reactive.function.server.HandlerFunction來表示。HandlerFunction是一個函數式接口,其中只有一個方法,因此可以用 Labmda 表達式來實現該接口:

使用 Spring 5 的 Webflux 開發 Reactive 應用


然後通過函數式接口org.springframework.web.reactive.function.server.RouterFunction來為這些HandlerFunction提供路由信息,輸入為請求,輸出為裝在 Mono 裡邊的Handlerfunction

使用 Spring 5 的 Webflux 開發 Reactive 應用


我們看到,在 WebFlux 中,請求和響應不再是 WebMVC 中的ServletRequest和ServletResponse,而是ServerRequest和ServerResponse。後者是在響應式編程中使用的接口,它們提供了對非阻塞和回壓特性的支持,以及 HTTP 消息體與響應式類型 Mono 和 Flux 的轉換方法。

下面我們用函數式的方式開發一個簡單的計算器,有add、subtract、multiply和divide四個方法,都是接口HandlerFunction的實現,分別對應加、減、乘、除四種運算。

對於這個需求,HandlerFunction 很容易寫:

使用 Spring 5 的 Webflux 開發 Reactive 應用


那麼 RouterFunction 為:

使用 Spring 5 的 Webflux 開發 Reactive 應用


啟動服務,訪問 http://localhost:8080/add?v1=2&v2=3 即計算 2+3 得到返回值5。

不過這麼寫在業務邏輯複雜的時候不太好組織,我們通常採用跟 MVC 類似的代碼組織方式,將同類業務的 HandlerFunction 放在一個類中,然後在 Java Config 中將 RouterFunction 配置為 Spring 容器的 Bean。我們繼續在這個計算器的代碼上開發:

使用 Spring 5 的 Webflux 開發 Reactive 應用


我們採用 Spring 現在比較推薦的 Java Config 的配置 Bean 的方式,創建用於存放 Router 的配置類RouterConfig.java:

使用 Spring 5 的 Webflux 開發 Reactive 應用


在上邊的代碼中,我們首先用RouterFunctions.route來根據Predicate是否匹配來確定HandlerFunction是否被應用。RequestPredicates中包含了很多靜態方法來創建常用的基於不同匹配規則的Predicate,如RequestPredicates.path用來根據 HTTP 請求的路徑來進行匹配,此處我們檢查請求的路徑是/calculator。然後使用ServerRequest的queryParam方法來獲取到參數operator的值,然後通過反射 API 在CalculatorHandler中找到與參數operator的值名稱相同的方法來確定要調用的HandlerFunction的實現,最後調用查找到的方法來處理該請求。如果找不到參數operator,服務器端返回 400 錯誤;如果反射 API 的方法調用中出現錯誤,服務器端返回 500 錯誤。

重啟服務然後我們訪問 http://127.0.0.1:8080/calculator?operator=add&v1=2&v2=3 得到返回值5。

客戶端

除了服務器端實現之外,WebFlux 也提供了反應式客戶端,可以訪問 HTTP、SSE 和 WebSocket 服務器端。

HTTP

對於 HTTP 和 SSE,可以使用 WebFlux 模塊中的類org.springframework.web.reactive.function.client.WebClient。下面的代碼中我們將用RESTClient來訪問前面小節中創建的 REST API。

使用 Spring 5 的 Webflux 開發 Reactive 應用


  1. 使用WebClient.create方法來創建一個新的WebClient對象
  2. 使用post方法來創建一個 POST 請求
  3. 指定 baseUrl
  4. 配置請求 Header:Content-Type: application/json
  5. 使用body()方法來設置 POST 請求的內容
  6. 異步地獲取 response 信息,返回值為WebClient.ResponseSpec,retrive()可以看做是exchange()方法的 “快捷版”(exchange()的返回值為ClientResponse)
  7. WebClient.ResponseSpec的bodyToFlux方法把響應內容轉換成User對象,最終得到的結果是Flux<user>
  8. 打印出來
  9. 由於是異步的,我們將測試線程 sleep 1 秒確保拿到 response,也可以用CountDownLatch


使用 Spring 5 的 Webflux 開發 Reactive 應用


SEE

WebClient 還可以用同樣的方式來訪問 SSE 服務。這裡我們訪問的是在之前的小節中創建的生成隨機數的 SSE 服務。使用 WebClient 訪問 SSE 在發送請求部分與訪問 REST API 是相同的,所不同的地方在於對 HTTP 響應的處理。

使用 Spring 5 的 Webflux 開發 Reactive 應用


  1. 由於 SSE 服務的響應是一個消息流,我們需要使用flatMapMany把 Mono<serverresponse>轉換成一個Flux<serversentevent>對象,這是通過方法BodyExtractors.toFlux來完成的,其中的參數new ParameterizedTypeReference<serversentevent>>() {}表明了響應消息流中的內容是ServerSentEvent對象/<serversentevent>/<serversentevent>/<serverresponse>
  2. 由於 SSE 服務器會不斷地發送消息,這裡我們只是通過buffer方法來獲取前 10 條消息並輸出
  3. 只讀地 peek 每個元素,然後打印出來,它並不是 subscribe,所以不會觸發流
  4. blockFirst方法,顧名思義,在收到第一個元素前會阻塞,響應式業務場景中慎用

運行效果如下:

使用 Spring 5 的 Webflux 開發 Reactive 應用


或者也可以像下邊這樣

使用 Spring 5 的 Webflux 開發 Reactive 應用


運行效果如下:

使用 Spring 5 的 Webflux 開發 Reactive 應用


WebSocket

訪問 WebSocket 不能使用 WebClient,而應該使用專門的 WebSocketClient 客戶端。Spring Boot 的 WebFlux 模板中默認使用的是 Reactor Netty 庫。Reactor Netty 庫提供了 WebSocketClient 的實現。我們這裡訪問前面創建的 WebSocket 服務。

使用 Spring 5 的 Webflux 開發 Reactive 應用


  1. 創建一個 WebSocketClient 實例
  2. 使用 WebSocketClient 的execute方法與 WebSocket 服務器建立連接,並執行給定的 WebSocketHandler 對象
  3. 通過 WebSocketSession 的send方法來發送字符串”Hello” 到服務器端
  4. 通過receive方法來等待服務器端的響應並輸出,take(1)的作用是表明客戶端只獲取服務器端發送的第一條消息

運行效果如下:

使用 Spring 5 的 Webflux 開發 Reactive 應用


測試

在 spring-test 模塊中也添加了對 WebFlux 的支持。通過類 org.springframework.test.web.reactive.server.WebTestClient可以測試 WebFlux 服務器。進行測試時既可以通過 mock 的方式來進行,也可以對實際運行的服務器進行集成測試。

我們通過一個集成測試來測試UserController中的創建用戶的功能。方法 WebTestClient.bindToServer綁定到一個運行的服務器並設置了基礎 URL。發送 HTTP 請求的方式與之前的代碼相同,不同的是exchange方法的返回值是ResponseSpec對象,其中包含了expectStatus和expectBody等方法來驗證 HTTP 響應的狀態碼和內容。方法jsonPath可以根據 JSON 對象中的路徑來進行驗證。

使用 Spring 5 的 Webflux 開發 Reactive 應用


JsonPath 的語法可以看 https://github.com/json-path/JsonPath


總結

反應式編程範式為開發高性能 Web 應用帶來了新的機會和挑戰。Spring 5 中的 WebFlux 模塊可以作為開發反應式 Web 應用的基礎。由於 Spring 框架的流行,WebFlux 會成為開發 Web 應用的重要趨勢之一。本文對 Spring 5 中的 WebFlux 模塊進行了詳細的介紹,包括如何用 WebFlux 開發 HTTP、SSE 和 WebSocket 服務器端應用,以及作為客戶端來訪問 HTTP、SSE 和 WebSocket 服務。對於 WebFlux 的基於 Java 註解和函數式編程等兩種模型都進行了介紹。最後介紹瞭如何測試 WebFlux 應用。

示例代碼:https://github.com/zhaoyibo/spring-webflux

作者: Windmt來源: Windmt


分享到:


相關文章: