Spark任务优化-checkpoint使用

Spark任务优化-checkpoint使用

下面结合persist来讨论checkpoint。

1.什么样的 RDD 需要 checkpoint

运算时间很长或运算量太大才能得到的 RDD,computing chain 过长或依赖其他 RDD 很多的 RDD。 实际上,将 ShuffleMapTask 的输出结果存放到本地磁盘也算是 checkpoint,只不过这个 checkpoint 的主要目的是去 partition 输出数据。

2.什么时候 checkpoint

cache 机制是每计算出一个要 cache 的 partition 就直接将其 cache 到内存了。但 checkpoint 没有使用这种第一次计算得到就存储的方法,而是等到 job 结束后另外启动专门的 job 去完成 checkpoint 。也就是说需要 checkpoint 的 RDD 会被计算两次。因此,在使用 rdd.checkpoint() 的时候,建议加上 rdd.cache(),这样第二次运行的 job 就不用再去计算该 rdd 了,直接读取 cache 写磁盘。其实 Spark 提供了 rdd.persist(StorageLevel.DISK_ONLY) 这样的方法,相当于 cache 到磁盘上,这样可以做到 rdd 第一次被计算得到时就存储到磁盘上,但这个 persist 和 checkpoint 有很多不同,之后会讨论。

3.checkpoint 怎么实现?

RDD 需要经过 [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ] 这几个阶段才能被 checkpoint。

Initialized: 首先 driver program 需要使用 rdd.checkpoint() 去设定哪些 rdd 需要 checkpoint,设定后,该 rdd 就接受 RDDCheckpointData 管理。用户还要设定 checkpoint 的存储路径,一般在 HDFS 上。

marked for checkpointing:初始化后,RDDCheckpointData 会将 rdd 标记为 MarkedForCheckpoint。

checkpointing in progress:每个 job 运行结束后会调用 finalRdd.doCheckpoint(),finalRdd 会顺着 computing chain 回溯扫描,碰到要 checkpoint 的 RDD 就将其标记为 CheckpointingInProgress,然后将写磁盘(比如写 HDFS)需要的配置文件(如 core-site.xml 等)broadcast 到其他 worker 节点上的 blockManager。完成以后,启动一个 job 来完成 checkpoint(使用 rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf)))。

checkpointed:job 完成 checkpoint 后,将该 rdd 的 dependency 全部清掉,并设定该 rdd 状态为 checkpointed。然后,为该 rdd 强加一个依赖,设置该 rdd 的 parent rdd 为 CheckpointRDD,该 CheckpointRDD 负责以后读取在文件系统上的 checkpoint 文件,生成该 rdd 的 partition。

有意思的是我在 driver program 里 checkpoint 了两个 rdd,结果只有一个(下面的 result)被 checkpoint 成功,pairs2 没有被 checkpoint,也不知道是 bug 还是故意只 checkpoint 下游的 RDD:

val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'),

(4, 'd'), (5, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))

val pairs1 = sc.parallelize(data1, 3)

val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'))

val pairs2 = sc.parallelize(data2, 2)

pairs2.checkpoint

val result = pairs1.join(pairs2)

result.checkpoint

4.怎么读取 checkpoint 过的 RDD

在 runJob() 的时候会先调用 finalRDD 的 partitions() 来确定最后会有多个 task。rdd.partitions() 会去检查(通过 RDDCheckpointData 去检查,因为它负责管理被 checkpoint 过的 rdd)该 rdd 是会否被 checkpoint 过了,如果该 rdd 已经被 checkpoint 过了,直接返回该 rdd 的 partitions 也就是 Array[Partition]。

当调用 rdd.iterator() 去计算该 rdd 的 partition 的时候,会调用 computeOrReadCheckpoint(split: Partition) 去查看该 rdd 是否被 checkpoint 过了,如果是,就调用该 rdd 的 parent rdd 的 iterator() 也就是 CheckpointRDD.iterator(),CheckpointRDD 负责读取文件系统上的文件,生成该 rdd 的 partition。这就解释了为什么那么 trickly 地为 checkpointed rdd 添加一个 parent CheckpointRDD。

5.cache 与 checkpoint 的区别

There is a significant difference between cache and checkpoint. Cache materializes the RDD and keeps it in memory and/or disk(其实只有 memory). But the lineage(也就是 computing chain) of RDD (that is, seq of operations that generated the RDD) will be remembered, so that if there are node failures and parts of the cached RDDs are lost, they can be regenerated. However, checkpoint saves the RDD to an HDFS file and actually forgets the lineage completely. This is allows long lineages to be truncated and the data to be saved reliably in HDFS (which is naturally fault tolerant by replication).


分享到:


相關文章: