checkpoint的实现

刘彬同学准备写一系列spark实战系列,本文是第二篇,checkpoint的实现!赞!推荐给大家,希望大家喜欢和支持!

系列文章

SparkContext 初始化内部原理

checkpoint 检查点是很多分布式系统的常用容错容灾机制,其本质就是将系统运行时的内存数据结构和状态持久化到磁盘上,在需要的时候对这些数据进行读取,然后重新构造出运行时的状态。在Spark中使用检查点就是为了将RDD的状态保存下来,在重新执行时就不需要计算,直接从检查点读取即可。

1Metadata checkpointing

保存流计算的定义信息到容错存储系统中,比如:HDFS(分布式存储系统),用来恢复应用程序中运行worker的节点故障,其中元数据包括:

  • Configuration:创建Spark Streaming应⽤用程序的配置信息

  • DStream operations:定义Streaming应⽤用程序的操作集合

  • Incomplete batches:操作存在队列中的未完成的批次

2Data checkpointing

保存生成的RDD到高可靠的存储系统中,这在有状态transformation中是必须的,在这样一个transformation中,生成的RDD依赖于之前批的RDD,随着时间的推移,这个依赖链长度会持续增长,在恢复过程中,为了避免无限增长,有状态的transformation的中间RDD会定期地存储到可靠的存储系统中,以截断这个依赖链

在Spark中checkpoint主要通过CheckpointRDD和RDDCheckpointData实现的,下来就来看⼀一下这两个的实现

https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala

CheckpointRDD : ⽤用来从存储中恢复检查点的RDD

checkpoint的实现

checkpointRDD重写了RDD的五个方法:分别为doCheckpoint , checkpoint , localCheckpoint , getPartition , compute

doCheckpoint :用来保存检查点

checkpoint :用来读取检查点数据

localcheckpoint:本地的检查点

getPartition :获取检查点的分区数组

compute:用来从检查点中恢复数据

checkpointRDD为抽象类,有两个子类继承了它,分别为LocalCheckPoint和ReliableCheckpoint,但是这两个类都没有完全实现checkpointRDD

3ReliableCheckpoint

ReliableCheckpoint中定义了如下属性:

sc:即sparkcontext

checkpointPath:检查点的目录

_partitioner:调用方指定的分区计算器

hadoopConf:SparkContext的_hadoopConfiguration属性,即hadoop配置信息

cpath:类型为org.apache.hadoop.fs.path表⽰示checkpoint path对应的hadoop⽂文件系统中的路径

fs:使⽤用hadoopConf得到的org.apache.hadoop.fs.filesystem

broadcastedConf:调用SparkContext的broadcast方法对hadoopconf进行广播后返回的Broadcase对象

partitioner:ReliableCheckpointRDD的分区计算器,优先采用_partitioner指定的,否则调用ReliableCheckpointRDD的伴⽣生对象的readCheckpointedPartitionerFile方法从checkpoint path指定的检

查点目录下读取分区计算器

ReliableCheckpointRDD还提供了很多工具方法,比如:

writePartitionToCheckpointFile方法用于将RDD分区的数据写入到检查点目录下的⽂文件中

writePartitionerToCheckpointDir:用于将分区计算器的数据写入到检查点的目录下

writeRDDTocheckpointDirectory:用于将RDD的数据写入检查点目录

checkpoint的实现

1)调用SparkContext的runJob方法将数据写入到检查点目录,将数据写入磁盘的函数是ReliableCheckpointRDD的伴生对象的writePartitionToCheckpointFile方法

2)如果RDD有分区计算器,那么调用ReliableCheckpointRDD的伴生对象writePartitionerToCheckpointDir将分区计算器的信息写入到检查点目录

3)创建并返回ReliableCheckpointRDD

readCheckpointFile 方法用于从检查点读取RDD的数据,代码如下:

checkpoint的实现

4RDDCheckpointData的实现

RDDCheckpointData用于保存与检查点相关的信息,每个RDDCheckpointData实例都与一个RDD实例相关

RDDCheckpointData中一共有三个属性,分别为rdd,cpState,cpRDDcpState : 检查点的状态,cpstate的值来自于枚举类型Checkpoint-State,CheckpointState中定义了检查点的状态,包括初始化完成Initialed,正在保存检查点( CheckpointInProgress )和保存检查

点完毕( Checkpointed )

cpRDD : 保存检查点的RDD,即CheckpointRDD的实现类

RDDCheckpointData中也定义了一些方法:

isCheckpoint⽅方法⽤用于判断是否已经为RDDCheckpointData关联的RDD保存了检查点数据

checkpoint的实现

checkpoint⽅方法用于将RDDCheckpointData关联的RDD数据保存到检查点的模版方法中

checkpoint的实现

如果检查点状态时Initialized,那么将cpState设置为CheckpointInProgress,否则返回。

调用doCheckpoint方法保存检查点并生成CheckpointRDD

由cpRDD持有刚⽣生成的checkpointRDD,然后将cpState设置成Checkpointed,最后调⽤用RDD的markCheckpointed⽅方法,清空依赖,清空依赖是因为现在已经有了,之前就不需要了

checkpointRDD :checkpointRDD方法用于获取cpRDD持有的CheckpointRDD

getPartition:用来获取CheckpointRDD的分区数组


分享到:


相關文章: