重讀Spark~RDD

RDD(resilient distributed dataset,彈性分佈式數據集),是Spark框架的基本計算單元,是一個不可修改的分佈式對象集合。每個RDD由多個分區組成,本身不存放數據,只存放數據的引用,操作 RDD 就像操作本地集合一樣,而每個分區可以在不同的節點上做計算。RDD是隻讀的,只能由一個RDD轉換到另一個RDD,可以通過Spark豐富的算子函數實現。RDD特點:

  • 有一個分片列表。只有能夠切分的數據才能並行計算。
  • 有一個函數計算每一個分片。
  • 對其他的RDD的可能有依賴列表,依賴還具體分為寬依賴和窄依賴,但並不是所有的RDD都有依賴。
  • 可選:key-value型的RDD是根據哈希來分區的,類似於mapreduce當中的Paritioner接口,控制key分到哪個reduce。
  • 可選:每一個分片的優先計算位置(preferred locations),比如HDFS的block的所在位置應該是優先計算的位置。

分區

RDD邏輯上是分區的,是抽象的概念,在開始計算的時候會通過compute函數來得到對應每個分區的數據。如果RDD是通過文件系統中的文件構建出的,那麼compute函數會讀取指定文件系統路徑中的數據;如果RDD是通過其他RDD轉換過來的,則將其他RDD處理好的數據進行轉換處理;

依賴

RDD通過算子操作進行轉換數據,轉換得到的新RDD包含了從其他RDD衍生過來的必要內容,RDD之間維護著血緣關係。

緩存

如果多次使用同一個RDD,則可以將其緩存起來。那麼該RDD只有在第一次計算的時候才會根據血緣關係得到分區的數據,在後續步驟中會直接從緩存處取數據。

CheckPoint

雖然RDD在數據丟失或任務失敗時可以根據血緣關係進行回溯重建,但是對於一個長時間長流程的處理任務來說,隨著時間的延長,RDD之間的血緣依賴關係也會越來越長。

一旦出錯,如果要從血緣關係進行恢復的話,系統性能肯定會受到影響。因此,RDD還支持CheckPoint功能將數據進行持久化保存,CheckPoint之後的RDD不再需要知道其父RDD的數據情況,當發生異常,可以直接從CheckPoint獲取數據。

RDD主要有三種創建方式:從已經存在的集合創建、從文件系統創建、從已有的RDD轉換。RDD算子主要分為兩類:Transformation算子和Action算子。

  • Transformation:根據RDD創建一個新的RDD,這類算子都是延遲加載的,它們不會直接執行計算,只是記住轉換動作。只有要求返回結果給Driver時,這些轉換才會真正運行。
  • Action:對RDD計算後返回一個結果給Driver,會直接執行計算。

Transformation算子

  • map(f: T => U) 對每個元素執行函數計算,返回MappedRDD[U]。
  • flatmap(f: T => TraversableOnce[U]) 首先對每個元素執行函數計算,然後將結果展平,返回FlatMappedRDD[U]。
  • filter(f: T => Boolean) 保留函數計算結果為true的元素,返回FilteredRDD[T]。
  • mapPartitions(Iterator[T] => Iterator[U]) 對每個分區執行函數計算,返回MapPartitionsRDD[U]。
  • union(otherRdd[T]) 對源RDD和參數RDD求並集,返回UnionRDD[T]。
  • distinct([numTasks]) 對源RDD進行去重後,返回新的RDD。numTasks表示並行任務的數量,默認為8。
  • partitionBy() 對RDD進行分區操作,如果原有的partionRDD和現有的partionRDD數量一致就不進行分區,否則會生成ShuffledRDD。
  • reduceByKey(f: (V, V) => V) 在(K,V)類型的RDD上調用,使用指定的reduce函數將key相同的value聚合到一起,返回ShuffledRDD[(K, V)]。同一臺機器上的鍵值對在移動前會先組合,適合大數據複雜計算。
  • groupByKey() 在(K,V)類型的RDD上調用,同樣對每個key進行操作,但只生成一個sequence,返回ShuffledRDD[([K], Iterable[V])]。所有的鍵值對都會被移動,網絡開銷大,不適合大數據複雜計算。
  • sortByKey([ascending], [numTasks]) 在(K,V)類型的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD。
  • repartition(numPartitions) 將RDD數據重新混洗(reshuffle)並隨機分佈到新的分區中,使數據分佈更均衡,新的分區個數取決於numPartitions。該算子總是需要通過網絡混洗所有數據,底層是調用coalesce(numPartitions, shuffle = true)。

Action算子

  • reduce(func) 通過func函數聚合RDD中的所有元素,這個功能必須是可交換且可並聯的。
  • collect() 在Driver中,以數組的形式返回數據集的所有元素。
  • count() 返回RDD的元素個數。
  • saveAsTextFile(path) 將數據集中的元素以textfile的形式保存到指定的目錄下,Spark會調用toString()方法轉換每個元素。
  • foreach(f: T => Unit) 為數據集的每一個元素調用func函數進行處理。
  • foreachPartition(f: Iterator[T] => Unit) 為數據集的每一個分區調用func函數進行處理,可以用來處理數據庫連接等操作。

子RDD與父RDD的血統依賴關係有兩種:寬依賴和窄依賴。窄依賴(narrow dependency)指的是父RDD的每個Partition最多被子RDD的一個Partition使用,如map、filter、union等操作都會產生窄依賴。寬依賴(wide dependency)指的是子RDD的多個Partition依賴於父RDD的同一個Partition,如groupByKey、reduceByKey、sortByKey等操作都會產生寬依賴。

DAG又稱有向無環圖,描述了多個RDD之間的轉換關係,子RDD和父RDD之間存在引用依賴關係,最後的RDD會觸發action操作——一個DAG構成一個Job任務。

根據寬窄依賴劃分DAG成不同的Stage,Spark劃分Stage的整體思路是:各個RDD之間的依賴關係形成DAG(有向無環圖),DAGScheduler對這些依賴關係形成的DAG進行Stage劃分,從後往前回溯,遇到寬依賴就斷開,劃分為一個Stage,遇到窄依賴就將這個RDD加入該Stage中。完成了Stage的劃分,DAGScheduler基於每個Stage生成TaskSet,並將TaskSet(一組Task)提交給TaskScheduler。TaskScheduler負責具體的任務調度,最後在Worker節點上啟動Task。

RDD緩存方式

RDD通過persist()方法或cache()方法可以緩存前面的計算結果,但並非這兩個方法被調用時立即緩存,而要等觸發後面的Action時才會將該RDD緩存在計算節點的內存中。其中緩存分為多個緩存級別,內存、磁盤或綜合緩存。

CheckPoint

Checkpoint的產生就是為了相對而言更加可靠的持久化數據。Checkpoint可以指定把數據放在本地,並且是多副本的方式;在生產環境中可以把數據放在HDFS上,藉助HDFS天然的高容錯、高可靠的特徵來實現可靠的持久化數據。CheckPoint 會斬斷之前的血緣關係,CheckPoint 後的RDD不知道它的父RDD了,從CheckPoint處直接拿到數據。

RDD緩存和CheckPoint有區別:

  • 存儲位置不同:persist/cache 緩存的數據保存在BlockManager的內存或磁盤上,而CheckPoint的數據保存在HDFS上。
  • 對血統的影響不同:persist/cache 緩存的RDD的不會丟掉血統,可以通過血緣關係重新計算。而CheckPoint執行完之後,已經將當前計算結果安全保存在HDFS上,會斬斷血緣關係。
  • 生命週期不同:persist/cache 緩存的RDD會在程序結束後被清除,Checkpoint保存的RDD在程序結束後會依然存在於HDFS。


分享到:


相關文章: