RDD(Resilient Distributed Datasets,彈性分佈式數據集)代表可並行操作元素的不可變分區集合。
為什麼需要RDD
數據處理模型
RDD是一個容錯的、並行的數據結構,可以控制將數據存儲到磁盤或者內存,能夠獲取數據的分區。RDD提供了一組類似於Scala的操作,比如map、flatMap、filter、reduceByKey、join、mapPartitions等,這些操作實際是對RDD進行轉換(transformation)。此外,RDD還提供了collect、foreach、count、reduce、countByKey等操作完成數據計算的動作(action)。
當前的大數據應用場景非常豐富,如流式計算、圖計算、機器學習等,它們既有相似之處,有各有不同。為了能夠對所有場景下的數據處理使用統一的方式,抽象出RDD這一模型。
通常數據處理的模型包括迭代計算、關係查詢、MapReduce、流式處理等。Hadoop採用MapReduce模型,Storm採用流式處理模型,而Spark則藉助RDD實現了以上所有模型。
依賴劃分原則
一個RDD包含一個或者多個分區,每個分區實際是一個數據集合的片段。在構建DAG的過程中,會將RDD用依賴關係串聯起來。每個RDD都有其依賴(除了最頂級RDD的依賴是空列表),這些依賴分為窄依賴(即NarrowDependency)和寬依賴(即ShuffleDepebdebcy)兩種。
為什麼要對依賴進行區分?從功能角度講是不一樣的,NarrowDependency會被劃分到同一個Stage中,這樣就能以管道的方式迭代執行。ShuffleDependency由於依賴的分區Task不止一個,所以往往需要跨節點傳輸數據。從容災角度講,恢復計算結果的方式不同。NarrowDependency只需要重新執行父RDD的丟失分區的計算即可恢復,ShuffleDependency則需要考慮恢復所有父RDD的丟失分區。
數據處理效率
RDD的計算過程允許在多個節點上併發執行。如果數據量很大,可以適當增加分區數量,這種根據硬件條件對併發任務數量的控制,能更好地利用各種資源,也能有效提高Spark的數據處理效率。
容錯處理
RDD本身是一個不可變的數據集,當某個Worker節點上的Task失敗時,可以利用DAG重新調度計算這些失敗的Task(執行以成功的Task可以從CheckPoint(檢查點)中讀取,而不用重新計算)。在流式計算的場景中,Spark需要記錄日誌和CheckPoint,以便利用CheckPoint和日誌對數據恢復。
源碼分析
抽象類RDD定義了所有RDD的規範,RDD的屬性包括:
RDD採用模板方法的模式設計,抽象類RDD中定義了模板方法及一些未實現的接口,這些接口需要RDD的各個子類實現。
接口
compute:對RDD的分區進行計算。getPartitions:獲取當前RDD的所有分區。getDependencies:獲取當前RDD的所有依賴getPreferredLocations:獲取某一分區的優先位置。是實現數據本地行的主要方法模板方法
iterator方法: 獲取 split 指定的 Partition 對應的數據的迭代器,有了這個迭代器就能一條一條取出數據來按 compute chain 來執行一個個transform 操作。其先判斷 RDD 的 storageLevel 是否為 NONE,若不是,則嘗試從緩存中讀取,讀取不到則通過計算來獲取該Partition對應的數據的迭代器;若是,嘗試從 checkpoint 中獲取 Partition 對應數據的迭代器,若 checkpoint 不存在則通過計算(compute屬性)
partitions方法:獲取RDD的分區數據partitions方法查找分區數組的優先級為:從CheckPoint查找-->讀取partition_屬性-->調用getPartitions方法獲取。
preferredLocations方法:優先調用Checkpoint中保存的RDD的getPreferredLocations方法獲取指定分區的優先位置,當沒有保存Checkpoint時,調用自身的getPreferredLocations方法獲取指定分區的優先位置。dependencies方法:獲取當前RDD的所有依賴序列dependencies方法執行的步驟如下:
1、從CheckPoint中獲取RDD,並將這些RDD封裝為OneToOneDependency列表。如果從CheckPoint中獲取到RDD的依賴,則返回RDD的依賴,否則進去下一步。
2、如果dependencies_等於null,那麼調用子類實現的getDependencies方法獲取當前RDD的依賴後賦予dependencies_,最後返回dependencies_。
其他方法
context方法:返回_sc(即SparkContext)getStorageLevel方法:返回當前RDD的StorageLevelgetNarrowAncestors方法:獲取當前RDD的祖先依賴中屬於窄依賴的RDD序列RDDInfo
RDDInfo用於描述RDD的信息,RDDInfo提供的信息有:
id:RDD的id。name:RDD的名稱。numPartitions:RDD的分區數量。storageLevel:RDD的存儲級別。parentIds:RDD的父RDD的id序列。一個RDD會有零到多個父RDD。callSite:RDD的用戶調用棧信息。scope:RDD的操作範圍。scope的類型是RDDOperationScope,每一個RDD都有一個RDDOperationScope。RDDOperationScope與Stage或Job之間並無特殊關係,一個RDDOperationScope可以存在於一個Stage內,也可以跨越多個Job。numCachedPartitions:緩存的分區數量。memSize:使用的內存大小。diskSize:使用的磁盤大小。externalBlockStoreSize: Block存儲在外部大小。方法
isCached方法:是否已經緩存。compare方法:由於RDDInfo繼承Ordered,所以重寫compare方法用於排序。伴生對象
定義了fromRdd方法,用於從RDD構建出對應的RDDInfo。具體執行步驟如下:
1、獲取當前RDD的名稱(即name屬性)作為RDDInfo的name屬性,如果RDD還沒有名稱,那麼調用Utils工具類的getFormattedClassName方法生成RDDInfo的name屬性。
2、獲取當前RDD依賴的所有父RDD的身份標識作為RDDInfo的parentIds屬性。
3、創建RDDInfo對象。
RDDInfo源碼,位於org.apache.spark.storage.RDDInfo