學習Flink的ProcessFunction過程中,官方文檔中涉及狀態處理的時候,不止一次提到只適用於keyed stream的元素,如下圖紅框所示:
之前寫過一些flink應用,keyed stream常用但不是必須用的,所以產生了疑問:
為何只有keyed stream的元素能讀寫狀態?每個key對應的狀態是如何操作的?Flink的"狀態"
先去回顧Flink"狀態"的知識點;官方文檔說就兩種狀態:keyed state和operator state:如上圖,keyed stream的元素是具有key的特徵,與ProcessFunction的操作狀態時要求匹配,其他steam的元素由於沒有key的特徵,所以也就沒有"狀態"一說了;另一種狀態是Operator State,如下圖,這是和多並行度計算時的算子實例綁定的,例如當前算子消費kafka的某個分區的最新offset,而ProcessFunction是用來處理stream元素的,不會涉及到Operator State:
官方demo
為了學習ProcessFunction就去看官方demo,地址是:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html ,簡單說說這個demo的功能:
數據源在不間斷的產生單詞,每個單詞對應一個Tuple2<string>的實例;/<string>數據源被keyBy方法轉成KeyedStream,key是Tuple2實例的f0字段;一個KeyedProcessFunction的子類CountWithTimeoutFunction,被用來處理KeyedStream的每個元素,處理的邏輯:為每個key維護一個狀態,狀態的內容是這個key的出現次數和最後一次出現時間;如果那個key連續一分鐘沒有出現,KeyedProcessFunction就向下遊發送這個元素;以上就是官方demo的功能,本來是想通過demo來加深認識,結果看完不但沒有明白,反而更暈了,下圖是我對demo代碼的疑惑:
從上圖可見我的疑惑,這裡再複述一下:
入參value是Tuple2類型,假設其f0字段等於aaa,那麼processElement方法的作用,就是取出aaa的狀態,更新後保存;從代碼上看,state.value()返回了aaa的狀態,這個value方法並沒有將aaa作為入參,那怎麼做到返回aaa的狀態呢?如果下一個入參value的f0字段等於bbb了,這個state.value()能返回bbb的狀態嗎?對更新狀態的代碼state.update(current)也是同樣的疑惑;然後又產生了新的疑惑:成員變量state難道是一直在變?每執行一次processElement,都會變成該key對應的state實例?先反思為何會有上述疑惑
上述疑惑產生的原因,應該是受到平時使用HashMap的影響,HashMap獲取值就是在調用get方法時指定key,設置值也是在put時指定key,所以看到state.value()方法沒有用key做入參就不習慣了要消除這種不適應,要做的第一件事就是提醒自己:processElement是在框架內運行的,很多數據在之前已經由框架準備好了;接下來要做的,就是把框架準備數據跟蹤源碼
如下圖,讓我們從一個斷點的堆棧開始吧,這是在執行上面demo中的processElement方法之前的一個斷點,可見根源是個線程的run方法,也就是KeyedProcessFunction對應的算子執行任務的線程:得益於Flink代碼自身規範、清晰的設計和實現,再加上IDEA強大的debug功能,整個閱讀和分析過程十分順利,這其中的收穫會逐漸在今後深入學習DataStreamAPI的過程中見效;
最後,根據上面的分析過程繪製了一幅簡陋的流程圖,希望能幫助您加快理解: