Spark-Core:RDD

RDD是分佈式內存的一個抽象概念,是一種高度受限的共享內存模型,即RDD是隻讀的記錄分區的集合,能夠跨越集群所有節點並行計算,是一種基於工作集的應用抽象。

存儲

每個RDD的數據都以Block的形式存儲於多臺機器上,每個Executor會啟動一個BlockManagerSlave,並管理一部分Block;而Block的元數據由Driver節點上的BlockManagerMaster保存,BlockManagerSlave生成Block後向BlockManagerMaster註冊該Block,BlockManagerMaster管理RDD與Block的關係,當RDD不在需要存儲的時候,將向BlockManagerSlave發送指令刪除相應的Block。

BlockManager

BlockManager管理RDD的物理分區,每個Block就是節點上對應的一個數據塊,可以存儲在內存或者磁盤上。而RDD中的partition是一個邏輯數據塊,對應相應的物理塊Block。本質上,一個RDD在代碼中相當於數據的一個元數據結構(一個RDD就是一組分區),存儲著數據分區及其邏輯結構映射關係,存儲著RDD之前的依賴轉換關係。分區是一個邏輯概念,Transformation前後的新舊分區在物理上可能是同一塊內存存儲。

BlockManager在每個節點上運行管理Block(Driver和Executor),提供了一個接口檢索本地和遠程的存儲變量,如memory、disk、off-heap。使用BlockManager前必須先初始化。

BlockManager聲明源碼,位於org.apache.spark.storage.BlockManager

 private[spark] class BlockManager(
executorId: String,
rpcEnv: RpcEnv,
val master: BlockManagerMaster,

val serializerManager: SerializerManager,
val conf: SparkConf,
memoryManager: MemoryManager,
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
val blockTransferService: BlockTransferService,
securityManager: SecurityManager,
numUsableCores: Int)
extends BlockDataManager with BlockEvictionHandler with Logging {

BlockManagerMaster會持有整個Application的Block的位置、Block所佔用的存儲空間等元數據信息,在Spark的Driver的DAGScheduler中,就是通過這些信息來確定數據運行的本地性的。

RDD

RDD作為泛型的抽象數據結構,支持兩種計算操作算子:Transformation(轉化)與Action(行為)。且RDD的寫操作是粗粒度的,讀操作既可以是粗粒度的,也可以是細粒度的。

RDD聲明源碼,位於org.apache.spark.rdd.RDD

 abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {

其中SparkContext是Spark功能的主要入口點,一個SparkContext代表一個集群連接,可以用其在集群中創建RDD、累加變量、廣播變量等。

五大特性

分區列表

Spark RDD是被分區的,每一個分區都會被一個計算任務(Task)處理,分區數決定並行計算數量,RDD的並行度默認從父RDD傳給子RDD。

默認情況下,一個HDFS上的數據分片就是一個Partition,RDD分片數決定了並行計算的力度,可以再創建RDD時指定RDD分區個數,如果不指定分區數量,當RDD集合創建時,則默認分區數量為該程序所分配到的資源的CPU核數,如果是從HDFS文件創建,默認為文件的Block數。

計算函數

每個分區都會有計算函數,Spark的RDD的計算函數是以分片為基礎單位的,每個RDD都會實現compute函數,對具體的分片進行計算,RDD中的分片是並行的,所以是分佈式並行計算。

依賴關係

RDD之間的依賴有兩種:窄依賴(Narrow Dependency)、寬依賴(Wide Dependency),其中的窄依賴值得是每一個父RDD的Partition最多被子RDD的一個Partition所使用,而寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition。RDD是Spark的核心數據結構,通過RDD的依賴關係形成調度關係。

分區函數

每個key-value形式的RDD都有Partitioner屬性,決定了RDD如何分區。RDD的分區函數,想控制RDD分區函數的時候可以使用Partitioner傳入相關參數,如HashPartitioner、RangePartitioner,它本身針對key-value的形式,如果不是key-value的形式,就不會有具體的Partitioner。

Partitioner決定了下一步會產生多少並行的分片,同時,也決定了當前並行Shuffle輸出的並行數據,從而是Spark具有能夠控制數據在不同節點上分區的特性。

Partition的個數決定了每個Stage的Task個數。

優先位置

每個分區都有一個優先位置列表。會存儲每個Partition的優先位置,對於一個HDFS文件來說,就是每個Partition塊位置。Spark本身在進行任務調度時候,會盡可能將任務分配到處理數據塊所在的最優位置。

RDD類源碼文件中的4個方法和一個屬性對應RDD的五大特性

 /**
* :: DeveloperApi ::
* 通過子類實現分區的計算函數
*/
@DeveloperApi

def compute(split: Partition, context: TaskContext): Iterator[T]

/**
* 返回一個RDD分區列表,
*/
protected def getPartitions: Array[Partition]

/**
* 返回依賴的父RDD列表
*/
protected def getDependencies: Seq[Dependency[_]] = deps

/**
* 返回RDD的優先位置,可選
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

/**
* 指定如何分區,可選
*/
@transient val partitioner: Option[Partitioner] = None


分享到:


相關文章: