有狀態流處理:Apache Flink狀態後端和如何管理Kafka消費偏移量

有狀態流處理:Apache Flink狀態後端

Apache Flink的3個狀態後端,它們的侷限性以及何時根據特定於案例的要求使用它們。


通過有狀態流處理,當開發人員啟用Flink應用程序的檢查點時,狀態將持續存在以防止數據丟失並確保在發生故障時完全恢復。為應用程序選擇狀態後端將影響狀態持久化的方式和位置。
瞭解管理Apache Flink狀態和開發有狀態流應用程序的實際示例。
Apache Flink附帶三個可用的狀態後端:MemoryStateBackend,FsStateBackend和RocksDBStateBackend。


有狀態流處理:Apache Flink狀態後端和如何管理Kafka消費偏移量

該MemoryStateBackend

MemoryStateBackend是一個內部狀態後端,用於維護Java堆上的狀態。鍵/值狀態和窗口運算符包含存儲值和計時器的哈希表。
當您的應用程序檢查點時,此後端將在您將狀態發送到Apache Flink的作業管理器之前拍攝您的狀態的快照,該作業管理器也將其存儲在Java堆上。
默認情況下,MemoryStateBackend配置為支持異步快照。異步快照可避免可能導致流應用程序背壓的潛在阻塞管道。

使用MemoryStateBackend時需要注意什麼:

  • 默認情況下,每個單獨的狀態的大小默認為5 MB。您可以在MemoryStateBackend構造函數中進一步增加大小。
  • 狀態大小受akka幀大小的限制,無論您在配置中設置為最大狀態大小,都不能大於akka幀大小(您可以在配置中找到更多信息)。
  • 聚合狀態必須適合JobManager內存。

何時使用MemoryStateBackend:

  • 建議使用MemoryStateBackend進行本地開發或調試,因為它的狀態有限
  • MemoryStateBackend最適合具有小狀態大小的用例和有狀態流處理應用程序,例如僅包含一次記錄功能(Map,FlatMap或Filter)的作業或使用Kafka 使用者。


該FsStateBackend

FsStateBackend配置使用文件系統完成,例如URL(類型,地址,路徑)。一些示例文件系統可能是:

  • “hdfs://namenode:40010/flink/checkpoints” 或
  • “s3://flink/checkpoints”.

當您選擇FsStateBackend時,正在進行的數據保存在任務管理器的內存中。在檢查點上,此後端將狀態快照寫入配置的文件系統和目錄中的文件,同時它將在JobManager的內存或Zookeeper中存儲最少的元數據(對於高可用性情況)。
默認情況下,FsStateBackend配置為提供異步快照,以避免在寫入狀態檢查點時阻塞處理管道。可以通過將構造函數中相應的布爾標誌設置為false來實例化FsStateBackend來禁用該功能,例如:

1

new FsStateBackend(path, false);

何時使用FsStateBackend:

  • FsStateBackend最適合處理大狀態,長窗口或大鍵/值狀態的Apache Flink有狀態流處理作業。
  • FsStateBackend最適合每個高可用性設置。

該RocksDBStateBackend

使用文件系統(類型,地址,路徑)執行RocksDBStateBackend的配置,如下例所示:

  • “hdfs://namenode:40010/flink/checkpoints”或
  • “s3://flink/checkpoints”

RocksDBStateBackend使用RocksDB數據庫在本地磁盤上保存飛行中的數據。在檢查點上,整個RocksDB數據庫將被檢查點到配置的文件系統中,或者在非常大的狀態作業的情況下增量差異。同時,Apache Flink將一些最小的元數據存儲在JobManager的內存或Zookeeper中(對於高可用性情況)。RocksDB默認配置為執行異步快照。

使用RocksDBStateBackend時需要注意什麼:

  • RocksDB的每個密鑰和每個值的最大支持大小為每個2 ^ 31個字節。這是因為RocksDB的JNI橋API基於byte []。
  • 我們需要在此強調,對於使用具有合併操作的狀態(例如ListState)的有狀態流處理應用程序,可以累積超過2 ^ 31字節超時的值大小,這將導致它們在任何後續檢索時失敗。

何時使用RocksDBStateBackend:

  • RocksDBStateBackend最適合處理大狀態,長窗口或大鍵/值狀態的 Apache Flink有狀態流處理作業。
  • RocksDBStateBackend最適合每個高可用性設置。
  • RocksDBStateBackend是目前唯一可用於支持有狀態流處理應用程序的增量檢查點的狀態後端。

使用RocksDB時,狀態大小僅受可用磁盤空間量的限制,這使RocksDBStateBackend成為管理超大狀態的絕佳選擇。使用RocksDB時的權衡是所有狀態訪問和檢索都需要序列化(或反序列化)才能跨越JNI邊界。與上面提到的堆上後端相比,這可能會影響應用程序的吞吐量。
不同的狀態後端服務於多個開發人員要求,應在開始開發應用程序之前仔細考慮和進行廣泛規劃後選擇。這可確保選擇正確的狀態後端以最好地滿足應用程序和業務需求。

Apache Flink如何管理Kafka消費偏移量


在我們《Flink Friday Tip》的這一集中,我們將逐步說明Apache Flink如何與Apache Kafka協同工作,以確保Kafka主題的記錄以一次性保證進行處理。

檢查點是Apache Flink的內部機制,可以從故障中恢復。檢查點是Flink應用程序狀態的一致副本,包括輸入的讀取位置。如果發生故障,Flink將通過從檢查點加載應用程序狀態並從恢復的讀取位置繼續恢復應用程序,就像沒有發生任何事情一樣。您可以將檢查點視為保存計算機遊戲的當前狀態。如果你在遊戲中保存了自己的位置後發生了什麼事情,你可以隨時回過頭再試一次。

檢查點使Apache Flink具有容錯能力,並確保在發生故障時保留流應用程序的語義。應用程序可以定期觸發檢查點。

Apache Flink中的Kafka消費者將Flink的檢查點機制與有狀態運算符集成在一起,其狀態是所有Kafka分區中的讀取偏移量。觸發檢查點時,每個分區的偏移量都存儲在檢查點中。Flink的檢查點機制確保所有操作員任務的存儲狀態是一致的,即它們基於相同的輸入數據。當所有操作員任務成功存儲其狀態時,檢查點完成。因此,當從潛在的系統故障重新啟動時,系統提供一次性狀態更新保證。

下面我們將介紹Apache Flink如何在逐步指南中檢查Kafka消費者抵消。在我們的示例中,數據存儲在Flink的Job Master中。值得注意的是,在POC或生產用例下,數據通常存儲在外部文件存儲器(如HDFS或S3)中。

步驟1:

下面的示例從Kafka主題中讀取兩個分區,每個分區包含“A”,“B”,“C”,“D”,“E”作為消息。我們將兩個分區的偏移量設置為零。

有狀態流處理:Apache Flink狀態後端和如何管理Kafka消費偏移量

第2步:

在第二步中,Kafka消費者開始從分區0讀取消息。消息“A”在“飛行中”處理,第一個消費者的偏移量變為1。


有狀態流處理:Apache Flink狀態後端和如何管理Kafka消費偏移量


第3步:

在第三步中,消息“A”到達Flink Map Task。兩個消費者都讀取他們的下一個記錄(分區0的消息“B”和分區1的消息“A”)。兩個分區的偏移量分別更新為2和1。與此同時,Flink的Job Master決定在源頭觸發檢查點。

有狀態流處理:Apache Flink狀態後端和如何管理Kafka消費偏移量

第4步:

在接下來的步驟中,Kafka使用者任務已經創建了狀態的快照(“offset = 2,1”),現在存儲在Apache Flink的Job Master中。源分別在來自分區0和1的消息“B”和“A”之後發出檢查點屏障。檢查點障礙用於對齊所有操作員任務的檢查點,並保證整個檢查點的一致性。消息“A”到達Flink Map Task,而頂級消費者繼續讀取其下一條記錄(消息“C”)。

有狀態流處理:Apache Flink狀態後端和如何管理Kafka消費偏移量

第5步:

此步驟顯示Flink Map Task從兩個源和檢查點接收檢查點障礙,其狀態為Job Master。與此同時,消費者繼續從Kafka分區閱讀更多活動。


有狀態流處理:Apache Flink狀態後端和如何管理Kafka消費偏移量


第6步:

此步驟顯示Flink Map Task在檢查其狀態後與Flink Job Master進行通信。當作業的所有任務確認其狀態為檢查點時,作業主管完成檢查點。從現在開始,檢查點可用於從故障中恢復。值得一提的是,Apache Flink不依賴於Kafka偏移來恢復潛在的系統故障。


有狀態流處理:Apache Flink狀態後端和如何管理Kafka消費偏移量


在發生故障時恢復

如果發生故障(例如,工作人員故障),則重新啟動所有操作員任務,並將其狀態重置為上次完成的檢查點。如下圖所示。


有狀態流處理:Apache Flink狀態後端和如何管理Kafka消費偏移量


Kafka源分別從偏移量2和1開始,因為這是完成的檢查點的偏移量。當作業重新啟動時,我們可以期待正常的系統操作,就好像之前沒有發生故障一樣。

你可以找到關於如何使用的詳細信息和常見問題最好的Apache Flink 和Apache Kafka 在我們以前的博客文章的一個位置。


分享到:


相關文章: