RDD(Resilient Distributed Datasets,弹性分布式数据集)代表可并行操作元素的不可变分区集合。
为什么需要RDD
数据处理模型
RDD是一个容错的、并行的数据结构,可以控制将数据存储到磁盘或者内存,能够获取数据的分区。RDD提供了一组类似于Scala的操作,比如map、flatMap、filter、reduceByKey、join、mapPartitions等,这些操作实际是对RDD进行转换(transformation)。此外,RDD还提供了collect、foreach、count、reduce、countByKey等操作完成数据计算的动作(action)。
当前的大数据应用场景非常丰富,如流式计算、图计算、机器学习等,它们既有相似之处,有各有不同。为了能够对所有场景下的数据处理使用统一的方式,抽象出RDD这一模型。
通常数据处理的模型包括迭代计算、关系查询、MapReduce、流式处理等。Hadoop采用MapReduce模型,Storm采用流式处理模型,而Spark则借助RDD实现了以上所有模型。
依赖划分原则
一个RDD包含一个或者多个分区,每个分区实际是一个数据集合的片段。在构建DAG的过程中,会将RDD用依赖关系串联起来。每个RDD都有其依赖(除了最顶级RDD的依赖是空列表),这些依赖分为窄依赖(即NarrowDependency)和宽依赖(即ShuffleDepebdebcy)两种。
为什么要对依赖进行区分?从功能角度讲是不一样的,NarrowDependency会被划分到同一个Stage中,这样就能以管道的方式迭代执行。ShuffleDependency由于依赖的分区Task不止一个,所以往往需要跨节点传输数据。从容灾角度讲,恢复计算结果的方式不同。NarrowDependency只需要重新执行父RDD的丢失分区的计算即可恢复,ShuffleDependency则需要考虑恢复所有父RDD的丢失分区。
数据处理效率
RDD的计算过程允许在多个节点上并发执行。如果数据量很大,可以适当增加分区数量,这种根据硬件条件对并发任务数量的控制,能更好地利用各种资源,也能有效提高Spark的数据处理效率。
容错处理
RDD本身是一个不可变的数据集,当某个Worker节点上的Task失败时,可以利用DAG重新调度计算这些失败的Task(执行以成功的Task可以从CheckPoint(检查点)中读取,而不用重新计算)。在流式计算的场景中,Spark需要记录日志和CheckPoint,以便利用CheckPoint和日志对数据恢复。
源码分析
抽象类RDD定义了所有RDD的规范,RDD的属性包括:
RDD采用模板方法的模式设计,抽象类RDD中定义了模板方法及一些未实现的接口,这些接口需要RDD的各个子类实现。
接口
compute:对RDD的分区进行计算。getPartitions:获取当前RDD的所有分区。getDependencies:获取当前RDD的所有依赖getPreferredLocations:获取某一分区的优先位置。是实现数据本地行的主要方法模板方法
iterator方法: 获取 split 指定的 Partition 对应的数据的迭代器,有了这个迭代器就能一条一条取出数据来按 compute chain 来执行一个个transform 操作。其先判断 RDD 的 storageLevel 是否为 NONE,若不是,则尝试从缓存中读取,读取不到则通过计算来获取该Partition对应的数据的迭代器;若是,尝试从 checkpoint 中获取 Partition 对应数据的迭代器,若 checkpoint 不存在则通过计算(compute属性)
partitions方法:获取RDD的分区数据partitions方法查找分区数组的优先级为:从CheckPoint查找-->读取partition_属性-->调用getPartitions方法获取。
preferredLocations方法:优先调用Checkpoint中保存的RDD的getPreferredLocations方法获取指定分区的优先位置,当没有保存Checkpoint时,调用自身的getPreferredLocations方法获取指定分区的优先位置。dependencies方法:获取当前RDD的所有依赖序列dependencies方法执行的步骤如下:
1、从CheckPoint中获取RDD,并将这些RDD封装为OneToOneDependency列表。如果从CheckPoint中获取到RDD的依赖,则返回RDD的依赖,否则进去下一步。
2、如果dependencies_等于null,那么调用子类实现的getDependencies方法获取当前RDD的依赖后赋予dependencies_,最后返回dependencies_。
其他方法
context方法:返回_sc(即SparkContext)getStorageLevel方法:返回当前RDD的StorageLevelgetNarrowAncestors方法:获取当前RDD的祖先依赖中属于窄依赖的RDD序列RDDInfo
RDDInfo用于描述RDD的信息,RDDInfo提供的信息有:
id:RDD的id。name:RDD的名称。numPartitions:RDD的分区数量。storageLevel:RDD的存储级别。parentIds:RDD的父RDD的id序列。一个RDD会有零到多个父RDD。callSite:RDD的用户调用栈信息。scope:RDD的操作范围。scope的类型是RDDOperationScope,每一个RDD都有一个RDDOperationScope。RDDOperationScope与Stage或Job之间并无特殊关系,一个RDDOperationScope可以存在于一个Stage内,也可以跨越多个Job。numCachedPartitions:缓存的分区数量。memSize:使用的内存大小。diskSize:使用的磁盘大小。externalBlockStoreSize: Block存储在外部大小。方法
isCached方法:是否已经缓存。compare方法:由于RDDInfo继承Ordered,所以重写compare方法用于排序。伴生对象
定义了fromRdd方法,用于从RDD构建出对应的RDDInfo。具体执行步骤如下:
1、获取当前RDD的名称(即name属性)作为RDDInfo的name属性,如果RDD还没有名称,那么调用Utils工具类的getFormattedClassName方法生成RDDInfo的name属性。
2、获取当前RDD依赖的所有父RDD的身份标识作为RDDInfo的parentIds属性。
3、创建RDDInfo对象。
RDDInfo源码,位于org.apache.spark.storage.RDDInfo