05.24 Java 9 新特性:Reactive Streams

Java 9 新特性:Reactive Streams

Reactive Streams

Reactive Streams是一個使用非阻塞背壓機制的異步流處理標準。

back pressure(背壓)是其中的關鍵概念。在異步模式中,消費者訂閱生產者,從生產者那裡獲取數據,需要提供回調方法,當生產者產生新的可用數據後,就調用回調方法。當生產者發送數據的速度大於消費者處理的速度時,消費者就會搶佔更多的資源來處理,並且有崩潰的可能。為了防止這種問題,需要一種機制,能讓消費者通知生產者:生產速度太快了需要減速,然後生產者可以進行相應調整。這個機制就叫做背壓

背壓可以分為阻塞非阻塞

阻塞比較簡單,例如生產者和消費者運行在同一個線程中,一個執行、另一個阻塞,意味著當消費者執行時,生產者不會發送新的數據。

非阻塞的方式是把 推模式改為了 拉模式,推模式是生產者來決定,生產者儘快的把數據發給消費者,拉模式是消費者來決定,消費者向生產者請求一定數量的數據,生產者會按照這個數量發送,在下次請求到來之前就是等待。

API 中的重要類型

Publisher

生產數據,供訂閱者消費,只有一個方法 subscribe(Subscriber)

Subscriber

訂閱生產者,接收數據(通過 onNext(T)方法)、錯誤信息( onError(Throwable)方法)、沒有更多數據的信號( onComplete()),在這些動作之前,publisher 會調用 onSubscription(Subscription)

Subscription

是發佈者和訂閱者之間的連接,訂閱者會通過它來請求更多的數據( request(long)),或者中斷連接( cancel())。

整體流程

  • 創建一個 Publisher 和一個 Subscriber

  • 通過 Publisher::subscribe 關聯訂閱者

  • 發佈者創建一個 Subscription 然後調用 Subscriber::onSubscription,這樣訂閱連接就建立起來了

  • 訂閱者調用 Subscription::request 請求一定數量的數據

  • 發佈者調用

    Subscriber::onNext 向訂閱者傳遞數據,數據量不會超過訂閱者指定的數量

  • 當發佈者沒有更多數據時會調用 Subscriber::onComplete,如果出錯就調用 Subscriber::onError

  • 訂閱者可以繼續請求更多的數據,或者通過 Subscription::cancel 關閉連接

可以看到,訂閱者調用 Subscription::request主動請求,這就是對非阻塞背壓的實現。


分享到:


相關文章: