Spark Streaming狀態管理函數的選擇比較

一、updateStateByKey

官網原話:

Spark Streaming狀態管理函數的選擇比較

也即是說它會統計全局的key的狀態,就算沒有數據輸入,它也會在每一個批次的時候返回之前的key的狀態。

缺點:若數據量太大的話,需要checkpoint的數據會佔用較大的存儲,效率低下。

程序示例如下:

Spark Streaming狀態管理函數的選擇比較

二、mapWithState

mapWithState:也是用於全局統計key的狀態,但是它如果沒有數據輸入,便不會返回之前的key的狀態,有一點增量的感覺。效率更高,生產中建議使用

官方代碼如下

Spark Streaming狀態管理函數的選擇比較

三、源碼分析

upateStateByKey:

  • map返回的是MappedDStream,而MappedDStream並沒有updateStateByKey方法,並且它的父類DStream中也沒有該方法。但是DStream的伴生對象中有一個隱式轉換函數:
Spark Streaming狀態管理函數的選擇比較

跟進去 PairDStreamFunctions ,發現最終調用的是自己的updateStateByKey。

其中updateFunc就要傳入的參數,他是一個函數,Seq[V]表示當前key對應的所有值,

Option[S] 是當前key的歷史狀態,返回的是新的狀態。

Spark Streaming狀態管理函數的選擇比較

最終調用:

Spark Streaming狀態管理函數的選擇比較

再跟進去 new StateDStream:

在這裡面new出了一個StateDStream對象。在其compute方法中,會先獲取上一個batch計算出的RDD(包含了至程序開始到上一個batch單詞的累計計數),然後在獲取本次batch中StateDStream的父類計算出的RDD(本次batch的單詞計數)分別是prevStateRDD和parentRDD,然後在調用 computeUsingPreviousRDD 方法:

Spark Streaming狀態管理函數的選擇比較

在這裡兩個RDD進行cogroup然後應用updateStateByKey傳入的函數。我們知道cogroup的性能是比較低下,參考【http://lxw1234.com/archives/2015/07/384.htm】。

mapWithState:

Spark Streaming狀態管理函數的選擇比較

說明:StateSpec 封裝了狀態管理函數,並在該方法中創建了MapWithStateDStreamImpl對象。

MapWithStateDStreamImpl 中創建了一個InternalMapWithStateDStream類型對象internalStream,在MapWithStateDStreamImpl的compute方法中調用了internalStream的getOrCompute方法。

Spark Streaming狀態管理函數的選擇比較

InternalMapWithStateDStream中沒有getOrCompute方法,這裡調用的是其父類 DStream 的getOrCpmpute方法,該方法中最終會調用InternalMapWithStateDStream的Compute方法:

Spark Streaming狀態管理函數的選擇比較

根據給定的時間生成一個MapWithStateRDD,首先獲取了先前狀態的RDD:preStateRDD和當前時間的RDD:dataRDD,然後對dataRDD基於先前狀態RDD的分區器進行重新分區獲取partitionedDataRDD。最後將preStateRDD,partitionedDataRDD和用戶定義的函數mappingFunction傳給新生成的MapWithStateRDD對象返回。

後續若有興趣可以繼續跟進MapWithStateRDD的compute方法,限於篇幅不再展示。


分享到:


相關文章: