劉彬同學準備寫一系列spark實戰系列,本文是第二篇,checkpoint的實現!贊!推薦給大家,希望大家喜歡和支持!
系列文章
SparkContext 初始化內部原理
checkpoint 檢查點是很多分佈式系統的常用容錯容災機制,其本質就是將系統運行時的內存數據結構和狀態持久化到磁盤上,在需要的時候對這些數據進行讀取,然後重新構造出運行時的狀態。在Spark中使用檢查點就是為了將RDD的狀態保存下來,在重新執行時就不需要計算,直接從檢查點讀取即可。
1Metadata checkpointing
保存流計算的定義信息到容錯存儲系統中,比如:HDFS(分佈式存儲系統),用來恢復應用程序中運行worker的節點故障,其中元數據包括:
Configuration:創建Spark Streaming應⽤用程序的配置信息
DStream operations:定義Streaming應⽤用程序的操作集合
Incomplete batches:操作存在隊列中的未完成的批次
2Data checkpointing
保存生成的RDD到高可靠的存儲系統中,這在有狀態transformation中是必須的,在這樣一個transformation中,生成的RDD依賴於之前批的RDD,隨著時間的推移,這個依賴鏈長度會持續增長,在恢復過程中,為了避免無限增長,有狀態的transformation的中間RDD會定期地存儲到可靠的存儲系統中,以截斷這個依賴鏈
在Spark中checkpoint主要通過CheckpointRDD和RDDCheckpointData實現的,下來就來看⼀一下這兩個的實現
https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
CheckpointRDD : ⽤用來從存儲中恢復檢查點的RDD
checkpointRDD重寫了RDD的五個方法:分別為doCheckpoint , checkpoint , localCheckpoint , getPartition , compute
doCheckpoint :用來保存檢查點
checkpoint :用來讀取檢查點數據
localcheckpoint:本地的檢查點
getPartition :獲取檢查點的分區數組
compute:用來從檢查點中恢復數據
checkpointRDD為抽象類,有兩個子類繼承了它,分別為LocalCheckPoint和ReliableCheckpoint,但是這兩個類都沒有完全實現checkpointRDD
3ReliableCheckpoint
ReliableCheckpoint中定義瞭如下屬性:
sc:即sparkcontext
checkpointPath:檢查點的目錄
_partitioner:調用方指定的分區計算器
hadoopConf:SparkContext的_hadoopConfiguration屬性,即hadoop配置信息
cpath:類型為org.apache.hadoop.fs.path表⽰示checkpoint path對應的hadoop⽂文件系統中的路徑
fs:使⽤用hadoopConf得到的org.apache.hadoop.fs.filesystem
broadcastedConf:調用SparkContext的broadcast方法對hadoopconf進行廣播後返回的Broadcase對象
partitioner:ReliableCheckpointRDD的分區計算器,優先採用_partitioner指定的,否則調用ReliableCheckpointRDD的伴⽣生對象的readCheckpointedPartitionerFile方法從checkpoint path指定的檢
查點目錄下讀取分區計算器
ReliableCheckpointRDD還提供了很多工具方法,比如:
writePartitionToCheckpointFile方法用於將RDD分區的數據寫入到檢查點目錄下的⽂文件中
writePartitionerToCheckpointDir:用於將分區計算器的數據寫入到檢查點的目錄下
writeRDDTocheckpointDirectory:用於將RDD的數據寫入檢查點目錄
1)調用SparkContext的runJob方法將數據寫入到檢查點目錄,將數據寫入磁盤的函數是ReliableCheckpointRDD的伴生對象的writePartitionToCheckpointFile方法
2)如果RDD有分區計算器,那麼調用ReliableCheckpointRDD的伴生對象writePartitionerToCheckpointDir將分區計算器的信息寫入到檢查點目錄
3)創建並返回ReliableCheckpointRDD
readCheckpointFile 方法用於從檢查點讀取RDD的數據,代碼如下:
4RDDCheckpointData的實現
RDDCheckpointData用於保存與檢查點相關的信息,每個RDDCheckpointData實例都與一個RDD實例相關
RDDCheckpointData中一共有三個屬性,分別為rdd,cpState,cpRDDcpState : 檢查點的狀態,cpstate的值來自於枚舉類型Checkpoint-State,CheckpointState中定義了檢查點的狀態,包括初始化完成Initialed,正在保存檢查點( CheckpointInProgress )和保存檢查
點完畢( Checkpointed )
cpRDD : 保存檢查點的RDD,即CheckpointRDD的實現類
RDDCheckpointData中也定義了一些方法:
isCheckpoint⽅方法⽤用於判斷是否已經為RDDCheckpointData關聯的RDD保存了檢查點數據
checkpoint⽅方法用於將RDDCheckpointData關聯的RDD數據保存到檢查點的模版方法中
如果檢查點狀態時Initialized,那麼將cpState設置為CheckpointInProgress,否則返回。
調用doCheckpoint方法保存檢查點並生成CheckpointRDD
由cpRDD持有剛⽣生成的checkpointRDD,然後將cpState設置成Checkpointed,最後調⽤用RDD的markCheckpointed⽅方法,清空依賴,清空依賴是因為現在已經有了,之前就不需要了
checkpointRDD :checkpointRDD方法用於獲取cpRDD持有的CheckpointRDD
getPartition:用來獲取CheckpointRDD的分區數組
閱讀更多 大數據和雲計算技術 的文章