JDK9新特性 Reactive Stream 響應式流

JDK9新特性 Reactive Stream 響應式流

 本篇主要講解 JDK9特性 Reactive Stream 響應式流,介紹 Reactive Stream是什麼 背壓是什麼,以及JDK9中提供的關於Reactive Stream的接口和 2個使用案例包括如何使用Processor。


 1.Reactive Stream 概念

 Reactive Stream (響應式流/反應流) 是JDK9引入的一套標準,是一套基於發佈/訂閱模式的數據處理規範。響應式流從2013年開始,作為提供非阻塞背壓的異步流處理標準的倡議。 它旨在解決處理元素流的問題——如何將元素流從發佈者傳遞到訂閱者,而不需要發佈者阻塞,或訂閱者有無限制的緩衝區或丟棄。更確切地說,Reactive流目的是“找到最小的一組接口,方法和協議,用來描述必要的操作和實體以實現這樣的目標:以非阻塞背壓方式實現數據的異步流”。


反應式流 (Reactive Stream) 規範誕生,定義瞭如下四個接口:

<code>    Subscription 接口定義了連接發佈者和訂閱者的方法
Publisher 接口定義了發佈者的方法
Subscriber 接口定義了訂閱者的方法
Processor 接口定義了處理器
/<code>

Reactive Stream 規範誕生後,RxJava 從 RxJava 2 開始實現 Reactive Stream 規範 , 同時 Spring提供的Reactor 框架(WebFlux的基礎) 等也相繼實現了 Reactive Stream 規範

 下圖展示了訂閱者和發佈者之間的交互

JDK9新特性 Reactive Stream 響應式流


 2.背壓(back pressure)概念

 如果生產者發出的信息比消費者能夠處理消息最大量還要多,消費者可能會被迫一直在抓消息,耗費越來越多的資源,埋下潛在的崩潰風險。為了防止這一點,需要有一種機制使消費者可以通知生產者,降低消息的生成速度。生產者可以採用多種策略來實現這一要求,這種機制稱為背壓。

簡單來說就是

  • 背壓指的發佈者和訂閱者之間的互動
  • 訂閱者可以告訴發佈者自己需要多少數據,可以調節數據流量,不會導致發佈者發佈數據過多導致數據浪費或壓垮訂閱者

 3.JDK9中 Reactive Stream規範的實現

 JDK9中Reactive Stream的實現規範 通常被稱為 Flow API ,通過java.util.concurrent.Flow 和java.util.concurrent.SubmissionPublisher 類來實現響應式流

 在JDK9裡Reactive Stream的主要接口聲明在Flow類裡,Flow 類中定義了四個嵌套的靜態接口,用於建立流量控制的組件,發佈者在其中生成一個或多個供訂閱者使用的數據項:

  • Publisher:數據項發佈者、生產者
  • Subscriber:數據項訂閱者、消費者
  • Subscription:發佈者與訂閱者之間的關係紐帶,訂閱令牌
  • Processor:數據處理器

JDK9新特性 Reactive Stream 響應式流


  3.1 發佈者 Publisher

  Publisher 將數據流發佈給註冊的 Subscriber。 它通常使用 Executor 異步發佈項目給訂閱者。 Publisher 需要確保每個訂閱的 Subscriber 方法嚴格按順序調用。

  • subscribe:訂閱者訂閱發佈者
<code>        @FunctionalInterface 
public static interface Flow.Publisher {
public void subscribe(Subscriber super T> subscriber);
}

/<code>

  3.2 訂閱者 Subscriber

  Subscriber 訂閱 Publisher 的數據流,並接受回調。 如果 Subscriber 沒有發出請求,就不會收到數據。對於給定 訂閱合同(Subscription),調用 Subscriber 的方法是嚴格按順序的。

  • onSubscribe:發佈者調用訂閱者的這個方法來異步傳遞訂閱 , 這個方法在 publisher.subscribe方法調用後被執行
  • onNext:發佈者調用這個方法傳遞數據給訂閱者
  • onError:當 Publisher 或 Subscriber 遇到不可恢復的錯誤時調用此方法,之後不會再調用其他方法
  • onComplete:當數據已經發送完成,且沒有錯誤導致訂閱終止時,調用此方法,之後不再調用其他方法

  3.3 訂閱合同 Subscription

  Subscription 用於連接 Publisher 和 Subscriber。Subscriber 只有在請求時才會收到項目,並可以通過 Subscription 取消訂閱。Subscription 主要有兩個方法:

  • request:訂閱者調用此方法請求數據
  • cancel:訂閱者調用這個方法來取消訂閱,解除訂閱者與發佈者之間的關係
<code>        public static interface Flow.Subscription {
public void request(long n);
public void cancel();
}
/<code>

  3.4 處理器 Processor

  Processor 位於 Publisher 和 Subscriber 之間,用於做數據轉換。可以有多個 Processor 同時使用,組成一個處理鏈,鏈中最後一個處理器的處理結果發送給 Subscriber。JDK 沒有提供任何具體的處理器。處理器同時是訂閱者和發佈者,接口的定義也是繼承了兩者 即作為訂閱者也作為發佈者 ,作為訂閱者接收數據,然後進行處理,處理完後作為發佈者,再發布出去。

<code>    /**
* A component that acts as both a Subscriber and Publisher.
*
* @param the subscribed item type
* @param the published item type
*/
public static interface Processor extends Subscriber, Publisher {
}
![Xnip20200225_133449.png](http://cdn.askajohnny.com/Xnip2020-02-25_13-34-49.png)

/<code>

 4.JDK9 中Reactive Stream(Flow API )規範調用流程

 Publisher是能夠發出元素的發佈者,Subscriber是接收元素並做出響應的訂閱者。當執行Publisher裡的subscribe方法時,發佈者會回調訂閱者的onSubscribe方法,這個方法中,通常訂閱者會藉助傳入的Subscription向發佈者請求n個數據。然後發佈者通過不斷調用訂閱者的onNext方法向訂閱者發出最多n個數據。如果數據全部發完,則會調用onComplete告知訂閱者流已經發完;如果有錯誤發生,則通過onError發出錯誤數據,同樣也會終止流。

 其中,Subscription相當於是連接Publisher和Subscriber的“紐帶(合同)”。因為當發佈者調用subscribe方法註冊訂閱者時,會通過訂閱者的回調方法onSubscribe傳入Subscription對象,之後訂閱者就可以使用這個Subscription對象的request方法向發佈者“要”數據了。

背壓機制正是基於此來實現的

JDK9新特性 Reactive Stream 響應式流


 5.案例一 響應式基礎使用案例


  5.1 以下代碼簡單演示了SubmissionPublisher 和這套發佈-訂閱框架的基本使用方式:

  注意要使用JDK9以上的版本


<code>    /**
* @author johnny
* @create 2020-02-24 下午5:44
**/
@Slf4j
public class ReactiveStreamTest {
/<code>
<code>    public static void main(String[] args) throws InterruptedException {
/<code>
<code>        //1.創建 生產者Publisher JDK9自帶的 實現了Publisher接口
SubmissionPublisher<integer> publisher = new SubmissionPublisher<>();/<integer>/<code>
<code>        //2.創建 訂閱者 Subscriber,需要自己去實現內部方法/<code>
<code>        Flow.Subscriber<integer> subscriber = new Flow.Subscriber<>() {/<integer>/<code>
<code>            private Flow.Subscription subscription;/<code>
<code>            @Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("訂閱成功。。");
subscription.request(1);
System.out.println("訂閱方法裡請求一個數據");
}/<code>
<code>            @Override
public void onNext(Integer item) {
log.info("【onNext 接受到數據 item : {}】 ", item);
subscription.request(1);
}/<code>
<code>            @Override
public void onError(Throwable throwable) {
log.info("【onError 出現異常】");
subscription.cancel();
}/<code>
<code>            @Override
public void onComplete() {

log.info("【onComplete 所有數據接收完成】");
}
};/<code>
<code>        //3。發佈者和訂閱者 建立訂閱關係 就是回調訂閱者的onSubscribe方法傳入訂閱合同
publisher.subscribe(subscriber);
/<code>
<code>        //4.發佈者 生成數據
for (int i = 1; i <= 5; i++) {
log.info("【生產數據 {} 】", i );
//submit是一個阻塞方法,此時會調用訂閱者的onNext方法
publisher.submit(i);
}
/<code>
<code>        //5.發佈者 數據都已發佈完成後,關閉發送,此時會回調訂閱者的onComplete方法
publisher.close();/<code>
<code>        //主線程睡一會
Thread.currentThread().join(100000);
/<code>
<code>      }
}/<code>

打印輸出結果

JDK9新特性 Reactive Stream 響應式流

 **看結果好像我們看不出來Reactive Stream有什麼用 ,其實關鍵點在 publisher.submit(i); submit它是一個阻塞方法讓我們把代碼修改一點**

1.將onNext添加耗時操作,模擬業務耗時邏輯2.增加發布者發佈數據的數量,模擬真實場景 無限數據


<code>        @Override
public void onNext(Integer item) {
log.info("【onNext 接受到數據 item : {}】 ", item);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscription.request(1);
}/<code>
<code>        //發佈者 生成數據
for (int i = 1; i <= 1000; i++) {
log.info("【生產數據 {} 】", i );
//submit是一個阻塞方法,此時會調用訂閱者的onNext方法
publisher.submit(i);
}/<code>

直接看打印

 **會發現發佈者 生成數據到256後就會停止生產,這是因為publisher.submit(i)方法是阻塞的,內部有個緩衝數組最大容量就是256,只有當訂閱者發送 subscription.request(1); 請求後,才會從緩衝數組裡拿按照順序拿出數據傳給 onNext方法 供訂閱者處理,當subscription.request(1)這個方法被調用後,發佈者發現數組裡沒有滿才會再生產數據,這樣就防止了生產者一次生成過多的數據把訂閱者壓垮,從而實現了背壓機制**

JDK9新特性 Reactive Stream 響應式流


 6.案例二 響應式帶 Processor 使用案例


  6.1創建自定義Processor


<code>    package com.johnny.webflux.webfluxlearn.reactivestream;/<code>
<code>    import lombok.extern.slf4j.Slf4j;/<code>
<code>    import java.util.concurrent.Flow; 

import java.util.concurrent.SubmissionPublisher;/<code>
<code>    /**
* 自定義 Processor
*
* @author johnny
* @create 2020-02-25 下午1:56
**/
@Slf4j
public class MyProcessor extends SubmissionPublisher<integer> implements Flow.Processor<integer> {/<integer>/<integer>/<code>
<code>    private Flow.Subscription subscription;/<code>
<code>    @Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("【Processor 收到訂閱請求】");
//保存訂閱關係,需要用它來給發佈者 相應
this.subscription = subscription;/<code>
<code>        this.subscription.request(1);
}/<code>
<code>    @Override
public void onNext(Integer item) {
log.info("【onNext 收到發佈者數據 : {} 】", item);/<code>
<code>        //做業務處理。。
if (item % 2 == 0) {
//篩選偶數 發送給 訂閱者
this.submit(item);
}
this.subscription.request(1);
}/<code>
<code>    @Override
public void onError(Throwable throwable) {
// 我們可以告訴發佈者, 後面不接受數據了
this.subscription.cancel();
}/<code>
<code>    @Override
public void onComplete() {
log.info("【處理器處理完畢】");

this.close();
}
}
/<code>

  6.2 運行demo 關聯publisher 和 Processor 和 subscriber


<code>    package com.johnny.webflux.webfluxlearn.reactivestream;/<code>
<code>    import lombok.extern.slf4j.Slf4j;/<code>
<code>    import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;/<code>
<code>    /**
* 帶Processor的案例
*
* @author johnny
* @create 2020-02-25 下午2:17
**/
@Slf4j
public class ProcessorDemo {/<code>
<code>    public static void main(String[] args) throws InterruptedException {
/<code>
<code>        //創建發佈者
SubmissionPublisher<integer> publisher = new SubmissionPublisher<>();
/<integer>/<code>
<code>        //創建 Processor 即是發佈者也是訂閱者
MyProcessor myProcessor = new MyProcessor();
/<code>
<code>        //創建最終訂閱者
Flow.Subscriber<integer> subscriber = new Flow.Subscriber<>() {/<integer>/<code>
<code>            private Flow.Subscription subscription;/<code>
<code>            @Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);

}/<code>
<code>            @Override
public void onNext(Integer item) {
log.info("【onNext 從Processor 接受到過濾後的 數據 item : {}】 ", item);
this.subscription.request(1);
}/<code>
<code>            @Override
public void onError(Throwable throwable) {
log.info("【onError 出現異常】");
subscription.cancel();
}/<code>
<code>            @Override
public void onComplete() {
log.info("【onComplete 所有數據接收完成】");
}
};/<code>
<code>        //建立關係 發佈者和處理器, 此時處理器扮演 訂閱者
publisher.subscribe(myProcessor);/<code>
<code>        //建立關係 處理器和訂閱者  此時處理器扮演
myProcessor.subscribe(subscriber);/<code>
<code>        //發佈者發佈數據/<code>
<code>        publisher.submit(1);
publisher.submit(2);
publisher.submit(3);
publisher.submit(4);/<code>
<code>        publisher.close();/<code>
<code>        TimeUnit.SECONDS.sleep(2);/<code>
<code>      }
}/<code>
JDK9新特性 Reactive Stream 響應式流

 7.總結

本篇主要講解 JDK9特性 Reactive Stream 響應式流,介紹 Reactive Stream是什麼 背壓是什麼,以及JDK9中提供的關於Reactive Stream的接口和 2個使用案例包括如何使用Processor。

只需要關注JDK9提供的 4個接口,以及內部的方法,對著案例敲一遍代碼 其實流程還是很簡單的 加油吧!!!


個人博客網站 https://www.askajohnny.com 歡迎訪問!

本文由博客一文多發平臺 https://openwrite.cn?from=article_bottom 發佈!


分享到:


相關文章: