Spark-Core:RDD详解

RDD(Resilient Distributed Datasets,弹性分布式数据集)代表可并行操作元素的不可变分区集合。

为什么需要RDD

数据处理模型

RDD是一个容错的、并行的数据结构,可以控制将数据存储到磁盘或者内存,能够获取数据的分区。RDD提供了一组类似于Scala的操作,比如map、flatMap、filter、reduceByKey、join、mapPartitions等,这些操作实际是对RDD进行转换(transformation)。此外,RDD还提供了collect、foreach、count、reduce、countByKey等操作完成数据计算的动作(action)。

当前的大数据应用场景非常丰富,如流式计算、图计算、机器学习等,它们既有相似之处,有各有不同。为了能够对所有场景下的数据处理使用统一的方式,抽象出RDD这一模型。

通常数据处理的模型包括迭代计算、关系查询、MapReduce、流式处理等。Hadoop采用MapReduce模型,Storm采用流式处理模型,而Spark则借助RDD实现了以上所有模型。

依赖划分原则

一个RDD包含一个或者多个分区,每个分区实际是一个数据集合的片段。在构建DAG的过程中,会将RDD用依赖关系串联起来。每个RDD都有其依赖(除了最顶级RDD的依赖是空列表),这些依赖分为窄依赖(即NarrowDependency)和宽依赖(即ShuffleDepebdebcy)两种。

为什么要对依赖进行区分?从功能角度讲是不一样的,NarrowDependency会被划分到同一个Stage中,这样就能以管道的方式迭代执行。ShuffleDependency由于依赖的分区Task不止一个,所以往往需要跨节点传输数据。从容灾角度讲,恢复计算结果的方式不同。NarrowDependency只需要重新执行父RDD的丢失分区的计算即可恢复,ShuffleDependency则需要考虑恢复所有父RDD的丢失分区。

数据处理效率

RDD的计算过程允许在多个节点上并发执行。如果数据量很大,可以适当增加分区数量,这种根据硬件条件对并发任务数量的控制,能更好地利用各种资源,也能有效提高Spark的数据处理效率。

容错处理

RDD本身是一个不可变的数据集,当某个Worker节点上的Task失败时,可以利用DAG重新调度计算这些失败的Task(执行以成功的Task可以从CheckPoint(检查点)中读取,而不用重新计算)。在流式计算的场景中,Spark需要记录日志和CheckPoint,以便利用CheckPoint和日志对数据恢复。

源码分析

抽象类RDD定义了所有RDD的规范,RDD的属性包括:

_sc:指SparkContext。_sc由@transient修饰,所以此属性不会被序列化。deps:构造器参数之一,是Dependency的序列,用于存储当前RDD的依赖。RDD的子类在实现是不一定会传递此参数。由于deps由@transient修饰,所以此属性不会被序列化。partitioner:当前RDD的分区计算器。partitioner由@transient修饰,所以此属性不会被序列化。id:当前RDD的唯一身份标识。此属性通过调用SparkContext的nextRddId属性生成。name:RDD的名称。name由@transient修饰,所以此属性不会被序列化。dependencies_:与deps相同,但是可以被序列化。partitions_:存储当前RDD的所有分区数组。partitions_由@transient修饰,所以此属性不会被序列化。storageLevel:当前RDD的存储级别。creationSite:创建当前RDD的用户代码。creationSite由@transient修饰,所以此属性不会被序列化。scope:当前RDD的操作作用域。scope由@transient修饰,所以此属性不会被序列化。checkpointData:当前RDD的检查点数据。checkpointAllMarkedAncestors:是否对所有标记了需要保存检查点的祖先保存检查点。doCheckpointCalled:是否已经调用了doCheckpoint方法设置检查点。此属性可以阻止对RDD多次设置检查点。doCheckpointCalled由@transient修饰,所以此属性不会被序列化。

RDD采用模板方法的模式设计,抽象类RDD中定义了模板方法及一些未实现的接口,这些接口需要RDD的各个子类实现。

接口

compute:对RDD的分区进行计算。getPartitions:获取当前RDD的所有分区。getDependencies:获取当前RDD的所有依赖getPreferredLocations:获取某一分区的优先位置。是实现数据本地行的主要方法

模板方法

iterator方法: 获取 split 指定的 Partition 对应的数据的迭代器,有了这个迭代器就能一条一条取出数据来按 compute chain 来执行一个个transform 操作。

其先判断 RDD 的 storageLevel 是否为 NONE,若不是,则尝试从缓存中读取,读取不到则通过计算来获取该Partition对应的数据的迭代器;若是,尝试从 checkpoint 中获取 Partition 对应数据的迭代器,若 checkpoint 不存在则通过计算(compute属性)

partitions方法:获取RDD的分区数据

partitions方法查找分区数组的优先级为:从CheckPoint查找-->读取partition_属性-->调用getPartitions方法获取。

preferredLocations方法:优先调用Checkpoint中保存的RDD的getPreferredLocations方法获取指定分区的优先位置,当没有保存Checkpoint时,调用自身的getPreferredLocations方法获取指定分区的优先位置。dependencies方法:获取当前RDD的所有依赖序列

dependencies方法执行的步骤如下:

1、从CheckPoint中获取RDD,并将这些RDD封装为OneToOneDependency列表。如果从CheckPoint中获取到RDD的依赖,则返回RDD的依赖,否则进去下一步。

2、如果dependencies_等于null,那么调用子类实现的getDependencies方法获取当前RDD的依赖后赋予dependencies_,最后返回dependencies_。

其他方法

context方法:返回_sc(即SparkContext)getStorageLevel方法:返回当前RDD的StorageLevelgetNarrowAncestors方法:获取当前RDD的祖先依赖中属于窄依赖的RDD序列

RDDInfo

RDDInfo用于描述RDD的信息,RDDInfo提供的信息有:

id:RDD的id。name:RDD的名称。numPartitions:RDD的分区数量。storageLevel:RDD的存储级别。parentIds:RDD的父RDD的id序列。一个RDD会有零到多个父RDD。callSite:RDD的用户调用栈信息。scope:RDD的操作范围。scope的类型是RDDOperationScope,每一个RDD都有一个RDDOperationScope。RDDOperationScope与Stage或Job之间并无特殊关系,一个RDDOperationScope可以存在于一个Stage内,也可以跨越多个Job。numCachedPartitions:缓存的分区数量。memSize:使用的内存大小。diskSize:使用的磁盘大小。externalBlockStoreSize: Block存储在外部大小。

方法

isCached方法:是否已经缓存。compare方法:由于RDDInfo继承Ordered,所以重写compare方法用于排序。

伴生对象

定义了fromRdd方法,用于从RDD构建出对应的RDDInfo。具体执行步骤如下:

1、获取当前RDD的名称(即name属性)作为RDDInfo的name属性,如果RDD还没有名称,那么调用Utils工具类的getFormattedClassName方法生成RDDInfo的name属性。

2、获取当前RDD依赖的所有父RDD的身份标识作为RDDInfo的parentIds属性。

3、创建RDDInfo对象。

RDDInfo源码,位于org.apache.spark.storage.RDDInfo