checkpoint的實現

劉彬同學準備寫一系列spark實戰系列,本文是第二篇,checkpoint的實現!贊!推薦給大家,希望大家喜歡和支持!

系列文章

SparkContext 初始化內部原理

checkpoint 檢查點是很多分佈式系統的常用容錯容災機制,其本質就是將系統運行時的內存數據結構和狀態持久化到磁盤上,在需要的時候對這些數據進行讀取,然後重新構造出運行時的狀態。在Spark中使用檢查點就是為了將RDD的狀態保存下來,在重新執行時就不需要計算,直接從檢查點讀取即可。

1Metadata checkpointing

保存流計算的定義信息到容錯存儲系統中,比如:HDFS(分佈式存儲系統),用來恢復應用程序中運行worker的節點故障,其中元數據包括:

  • Configuration:創建Spark Streaming應⽤用程序的配置信息

  • DStream operations:定義Streaming應⽤用程序的操作集合

  • Incomplete batches:操作存在隊列中的未完成的批次

2Data checkpointing

保存生成的RDD到高可靠的存儲系統中,這在有狀態transformation中是必須的,在這樣一個transformation中,生成的RDD依賴於之前批的RDD,隨著時間的推移,這個依賴鏈長度會持續增長,在恢復過程中,為了避免無限增長,有狀態的transformation的中間RDD會定期地存儲到可靠的存儲系統中,以截斷這個依賴鏈

在Spark中checkpoint主要通過CheckpointRDD和RDDCheckpointData實現的,下來就來看⼀一下這兩個的實現

https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala

CheckpointRDD : ⽤用來從存儲中恢復檢查點的RDD

checkpoint的實現

checkpointRDD重寫了RDD的五個方法:分別為doCheckpoint , checkpoint , localCheckpoint , getPartition , compute

doCheckpoint :用來保存檢查點

checkpoint :用來讀取檢查點數據

localcheckpoint:本地的檢查點

getPartition :獲取檢查點的分區數組

compute:用來從檢查點中恢復數據

checkpointRDD為抽象類,有兩個子類繼承了它,分別為LocalCheckPoint和ReliableCheckpoint,但是這兩個類都沒有完全實現checkpointRDD

3ReliableCheckpoint

ReliableCheckpoint中定義瞭如下屬性:

sc:即sparkcontext

checkpointPath:檢查點的目錄

_partitioner:調用方指定的分區計算器

hadoopConf:SparkContext的_hadoopConfiguration屬性,即hadoop配置信息

cpath:類型為org.apache.hadoop.fs.path表⽰示checkpoint path對應的hadoop⽂文件系統中的路徑

fs:使⽤用hadoopConf得到的org.apache.hadoop.fs.filesystem

broadcastedConf:調用SparkContext的broadcast方法對hadoopconf進行廣播後返回的Broadcase對象

partitioner:ReliableCheckpointRDD的分區計算器,優先採用_partitioner指定的,否則調用ReliableCheckpointRDD的伴⽣生對象的readCheckpointedPartitionerFile方法從checkpoint path指定的檢

查點目錄下讀取分區計算器

ReliableCheckpointRDD還提供了很多工具方法,比如:

writePartitionToCheckpointFile方法用於將RDD分區的數據寫入到檢查點目錄下的⽂文件中

writePartitionerToCheckpointDir:用於將分區計算器的數據寫入到檢查點的目錄下

writeRDDTocheckpointDirectory:用於將RDD的數據寫入檢查點目錄

checkpoint的實現

1)調用SparkContext的runJob方法將數據寫入到檢查點目錄,將數據寫入磁盤的函數是ReliableCheckpointRDD的伴生對象的writePartitionToCheckpointFile方法

2)如果RDD有分區計算器,那麼調用ReliableCheckpointRDD的伴生對象writePartitionerToCheckpointDir將分區計算器的信息寫入到檢查點目錄

3)創建並返回ReliableCheckpointRDD

readCheckpointFile 方法用於從檢查點讀取RDD的數據,代碼如下:

checkpoint的實現

4RDDCheckpointData的實現

RDDCheckpointData用於保存與檢查點相關的信息,每個RDDCheckpointData實例都與一個RDD實例相關

RDDCheckpointData中一共有三個屬性,分別為rdd,cpState,cpRDDcpState : 檢查點的狀態,cpstate的值來自於枚舉類型Checkpoint-State,CheckpointState中定義了檢查點的狀態,包括初始化完成Initialed,正在保存檢查點( CheckpointInProgress )和保存檢查

點完畢( Checkpointed )

cpRDD : 保存檢查點的RDD,即CheckpointRDD的實現類

RDDCheckpointData中也定義了一些方法:

isCheckpoint⽅方法⽤用於判斷是否已經為RDDCheckpointData關聯的RDD保存了檢查點數據

checkpoint的實現

checkpoint⽅方法用於將RDDCheckpointData關聯的RDD數據保存到檢查點的模版方法中

checkpoint的實現

如果檢查點狀態時Initialized,那麼將cpState設置為CheckpointInProgress,否則返回。

調用doCheckpoint方法保存檢查點並生成CheckpointRDD

由cpRDD持有剛⽣生成的checkpointRDD,然後將cpState設置成Checkpointed,最後調⽤用RDD的markCheckpointed⽅方法,清空依賴,清空依賴是因為現在已經有了,之前就不需要了

checkpointRDD :checkpointRDD方法用於獲取cpRDD持有的CheckpointRDD

getPartition:用來獲取CheckpointRDD的分區數組


分享到:


相關文章: