Spark-Core:RDD詳解

RDD(Resilient Distributed Datasets,彈性分佈式數據集)代表可並行操作元素的不可變分區集合。

為什麼需要RDD

數據處理模型

RDD是一個容錯的、並行的數據結構,可以控制將數據存儲到磁盤或者內存,能夠獲取數據的分區。RDD提供了一組類似於Scala的操作,比如map、flatMap、filter、reduceByKey、join、mapPartitions等,這些操作實際是對RDD進行轉換(transformation)。此外,RDD還提供了collect、foreach、count、reduce、countByKey等操作完成數據計算的動作(action)。

當前的大數據應用場景非常豐富,如流式計算、圖計算、機器學習等,它們既有相似之處,有各有不同。為了能夠對所有場景下的數據處理使用統一的方式,抽象出RDD這一模型。

通常數據處理的模型包括迭代計算、關係查詢、MapReduce、流式處理等。Hadoop採用MapReduce模型,Storm採用流式處理模型,而Spark則藉助RDD實現了以上所有模型。

依賴劃分原則

一個RDD包含一個或者多個分區,每個分區實際是一個數據集合的片段。在構建DAG的過程中,會將RDD用依賴關係串聯起來。每個RDD都有其依賴(除了最頂級RDD的依賴是空列表),這些依賴分為窄依賴(即NarrowDependency)和寬依賴(即ShuffleDepebdebcy)兩種。

為什麼要對依賴進行區分?從功能角度講是不一樣的,NarrowDependency會被劃分到同一個Stage中,這樣就能以管道的方式迭代執行。ShuffleDependency由於依賴的分區Task不止一個,所以往往需要跨節點傳輸數據。從容災角度講,恢復計算結果的方式不同。NarrowDependency只需要重新執行父RDD的丟失分區的計算即可恢復,ShuffleDependency則需要考慮恢復所有父RDD的丟失分區。

數據處理效率

RDD的計算過程允許在多個節點上併發執行。如果數據量很大,可以適當增加分區數量,這種根據硬件條件對併發任務數量的控制,能更好地利用各種資源,也能有效提高Spark的數據處理效率。

容錯處理

RDD本身是一個不可變的數據集,當某個Worker節點上的Task失敗時,可以利用DAG重新調度計算這些失敗的Task(執行以成功的Task可以從CheckPoint(檢查點)中讀取,而不用重新計算)。在流式計算的場景中,Spark需要記錄日誌和CheckPoint,以便利用CheckPoint和日誌對數據恢復。

源碼分析

抽象類RDD定義了所有RDD的規範,RDD的屬性包括:

  • _sc:指SparkContext。_sc由@transient修飾,所以此屬性不會被序列化。
  • deps:構造器參數之一,是Dependency的序列,用於存儲當前RDD的依賴。RDD的子類在實現是不一定會傳遞此參數。由於deps由@transient修飾,所以此屬性不會被序列化。
  • partitioner:當前RDD的分區計算器。partitioner由@transient修飾,所以此屬性不會被序列化。
  • id:當前RDD的唯一身份標識。此屬性通過調用SparkContext的nextRddId屬性生成。
  • name:RDD的名稱。name由@transient修飾,所以此屬性不會被序列化。
  • dependencies_:與deps相同,但是可以被序列化。
  • partitions_:存儲當前RDD的所有分區數組。partitions_由@transient修飾,所以此屬性不會被序列化。
  • storageLevel:當前RDD的存儲級別。
  • creationSite:創建當前RDD的用戶代碼。creationSite由@transient修飾,所以此屬性不會被序列化。
  • scope:當前RDD的操作作用域。scope由@transient修飾,所以此屬性不會被序列化。
  • checkpointData:當前RDD的檢查點數據。
  • checkpointAllMarkedAncestors:是否對所有標記了需要保存檢查點的祖先保存檢查點。
  • doCheckpointCalled:是否已經調用了doCheckpoint方法設置檢查點。此屬性可以阻止對RDD多次設置檢查點。doCheckpointCalled由@transient修飾,所以此屬性不會被序列化。

RDD採用模板方法的模式設計,抽象類RDD中定義了模板方法及一些未實現的接口,這些接口需要RDD的各個子類實現。

接口

  • compute:對RDD的分區進行計算。
  • getPartitions:獲取當前RDD的所有分區。
  • getDependencies:獲取當前RDD的所有依賴
  • getPreferredLocations:獲取某一分區的優先位置。是實現數據本地行的主要方法

模板方法

  • iterator方法: 獲取 split 指定的 Partition 對應的數據的迭代器,有了這個迭代器就能一條一條取出數據來按 compute chain 來執行一個個transform 操作。

其先判斷 RDD 的 storageLevel 是否為 NONE,若不是,則嘗試從緩存中讀取,讀取不到則通過計算來獲取該Partition對應的數據的迭代器;若是,嘗試從 checkpoint 中獲取 Partition 對應數據的迭代器,若 checkpoint 不存在則通過計算(compute屬性)

  • partitions方法:獲取RDD的分區數據

partitions方法查找分區數組的優先級為:從CheckPoint查找-->讀取partition_屬性-->調用getPartitions方法獲取。

  • preferredLocations方法:優先調用Checkpoint中保存的RDD的getPreferredLocations方法獲取指定分區的優先位置,當沒有保存Checkpoint時,調用自身的getPreferredLocations方法獲取指定分區的優先位置。
  • dependencies方法:獲取當前RDD的所有依賴序列

dependencies方法執行的步驟如下:

1、從CheckPoint中獲取RDD,並將這些RDD封裝為OneToOneDependency列表。如果從CheckPoint中獲取到RDD的依賴,則返回RDD的依賴,否則進去下一步。

2、如果dependencies_等於null,那麼調用子類實現的getDependencies方法獲取當前RDD的依賴後賦予dependencies_,最後返回dependencies_。

其他方法

  • context方法:返回_sc(即SparkContext)
  • getStorageLevel方法:返回當前RDD的StorageLevel
  • getNarrowAncestors方法:獲取當前RDD的祖先依賴中屬於窄依賴的RDD序列

RDDInfo

RDDInfo用於描述RDD的信息,RDDInfo提供的信息有:

  • id:RDD的id。
  • name:RDD的名稱。
  • numPartitions:RDD的分區數量。
  • storageLevel:RDD的存儲級別。
  • parentIds:RDD的父RDD的id序列。一個RDD會有零到多個父RDD。
  • callSite:RDD的用戶調用棧信息。
  • scope:RDD的操作範圍。scope的類型是RDDOperationScope,每一個RDD都有一個RDDOperationScope。RDDOperationScope與Stage或Job之間並無特殊關係,一個RDDOperationScope可以存在於一個Stage內,也可以跨越多個Job。
  • numCachedPartitions:緩存的分區數量。
  • memSize:使用的內存大小。
  • diskSize:使用的磁盤大小。
  • externalBlockStoreSize: Block存儲在外部大小。

方法

  • isCached方法:是否已經緩存。
  • compare方法:由於RDDInfo繼承Ordered,所以重寫compare方法用於排序。

伴生對象

定義了fromRdd方法,用於從RDD構建出對應的RDDInfo。具體執行步驟如下:

1、獲取當前RDD的名稱(即name屬性)作為RDDInfo的name屬性,如果RDD還沒有名稱,那麼調用Utils工具類的getFormattedClassName方法生成RDDInfo的name屬性。

2、獲取當前RDD依賴的所有父RDD的身份標識作為RDDInfo的parentIds屬性。

3、創建RDDInfo對象。

RDDInfo源碼,位於org.apache.spark.storage.RDDInfo


分享到:


相關文章: