Reactive Streams
Reactive Streams是一個使用非阻塞背壓機制的異步流處理標準。
back pressure(背壓)是其中的關鍵概念。在異步模式中,消費者訂閱生產者,從生產者那裡獲取數據,需要提供回調方法,當生產者產生新的可用數據後,就調用回調方法。當生產者發送數據的速度大於消費者處理的速度時,消費者就會搶佔更多的資源來處理,並且有崩潰的可能。為了防止這種問題,需要一種機制,能讓消費者通知生產者:生產速度太快了需要減速,然後生產者可以進行相應調整。這個機制就叫做背壓。
背壓可以分為阻塞和非阻塞。
阻塞比較簡單,例如生產者和消費者運行在同一個線程中,一個執行、另一個阻塞,意味著當消費者執行時,生產者不會發送新的數據。
非阻塞的方式是把 推模式改為了 拉模式,推模式是生產者來決定,生產者儘快的把數據發給消費者,拉模式是消費者來決定,消費者向生產者請求一定數量的數據,生產者會按照這個數量發送,在下次請求到來之前就是等待。
API 中的重要類型
Publisher
生產數據,供訂閱者消費,只有一個方法 subscribe(Subscriber)
Subscriber
訂閱生產者,接收數據(通過 onNext(T)方法)、錯誤信息( onError(Throwable)方法)、沒有更多數據的信號( onComplete()),在這些動作之前,publisher 會調用 onSubscription(Subscription)。
Subscription
是發佈者和訂閱者之間的連接,訂閱者會通過它來請求更多的數據( request(long)),或者中斷連接( cancel())。
整體流程
創建一個 Publisher 和一個 Subscriber
通過 Publisher::subscribe 關聯訂閱者
發佈者創建一個 Subscription 然後調用 Subscriber::onSubscription,這樣訂閱連接就建立起來了
訂閱者調用 Subscription::request 請求一定數量的數據
發佈者調用
Subscriber::onNext 向訂閱者傳遞數據,數據量不會超過訂閱者指定的數量當發佈者沒有更多數據時會調用 Subscriber::onComplete,如果出錯就調用 Subscriber::onError
訂閱者可以繼續請求更多的數據,或者通過 Subscription::cancel 關閉連接
可以看到,訂閱者調用 Subscription::request主動請求,這就是對非阻塞背壓的實現。
閱讀更多 性能與架構 的文章