Spark 彈性分佈式數據集(RDD)

1.RDD簡介

RDD彈性分佈式數據集,一個RDD代表一個被分區的只讀數據集。

RDD是spark的核心數據結構。通過RDD的依賴關係形成spark的調度順序,通過對RDD的操作形成整個spark程序。

RDD的本質:在並行計算的各個階段進行有效的數據共享(減少網絡之間的文件傳輸代價)。

1.1.RDD的容錯處理

由於RDD的接口只支持粗粒度操作(即一個操作會被應用在RDD的所有數據上),所有隻要通過記錄下這些作用在RDD之上的轉換操作,來構建rdd的繼承關係(lineage),就可以有效的進行容錯處理。

1.2.RDD的生成

RDD的生成只有三種途徑:

1.從內存集合

2.外部存儲系統,hdfs(hive ,Cassandra, hbase)

默認 spark為每個hdfs的數據塊創建一個分區,也可以設置較大分區數, 但是不能設置分區數小於數據塊的數量。

使用sparkContext.wholeTextFiles讀取一個包含很多小文件的目錄,返回(文件名,內容)對,相對於textFile 返回文件中每一行作為一條記錄。

為了SequenceFiles文件,使用sc.sequenceFile[K,V],k v是文件中k,v的數據類型。

為了hadoop其他的輸入格式,可以使用sc.hadoopRDD。

一個partition 對應一個task,一個task 必定存在於一個Executor,一個Executor 對應一個JVM.

3.通過轉換操作來自於其他的RDD ,如map,filter,join等。

2.spark RDD接口

partition 分區,一個RDD會有一個或者多個分區

preferredLocations(p) 對於分區p,返回數據本地化計算的節點

dependencies:RDD依賴關係

compute(p,context): 對於分區p,進行迭代計算

partitioner:RDD分區函數

2.1.RDD分區(partitions)

一個RDD包含一個或多個分區,每個分區都有分區屬性,分區的多少決定了對RDD進行並行計算的並行度。

分區決定並行計算的粒度。每個rdd分區的計算操作都在一個單獨的任務中被執行。

一個partition 對應一個task,一個task 必定存在於一個Executor,一個Executor 對應一個JVM.

利用rdd的成員變量partitions返回的數組大小查詢RDD分區數。

#數組轉換成分區,第二個參數是指定分區數

scala> val rdd =sc.parallelize(1 to 100,2)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :12

#返回分區數

scala> rdd.partitions.size

res2: Int = 2

#查看默認分區數 ()

scala> val rdd2 = sc.parallelize(1 to 100)

rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :12

scala> rdd.partitions.size

res5: Int = 2

2.2.RDD優先位置(preferredLocations)

返回此RDD的每個分區所存儲的位置。(用於在spark進行任務調度的時候,儘可能將任務分配到數據塊所存儲的位置)

依從hadoop讀取數據生成RDD為例:preferredLocations返回每個數據塊所在的機器名或IP地址,如果每一塊數據是多份存儲,則返回多個機器地址。

#讀取hdfs生成一個類型為MappedRDD的RDD (一個文件生成一個RDD).

scala> val rdd = sc.textFile("hdfs://yarn1:8020/hmbbs_logs/access_2013_05_31.log")

16/04/27 21:45:41 INFO MemoryStore: ensureFreeSpace(219256) called with curMem=0, maxMem=311387750

16/04/27 21:45:41 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 214.1 KB, free 296.8 MB)

rdd: org.apache.spark.rdd.RDD[String] = MappedRDD[4] at textFile at :12

#通過依賴關係找到原始的rdd

scala> val hadoopRDD = rdd.dependencies(0).rdd

hadoopRDD: org.apache.spark.rdd.RDD[_] = HadoopRDD[3] at textFile at :12

#hadoopRDD分區個數

scala> hadoopRDD.partitions.size

16/04/27 21:46:35 INFO FileInputFormat: Total input paths to process : 1

16/04/27 21:46:35 INFO NetworkTopology: Adding a new node: /default/192.168.1.64:50010

16/04/27 21:46:35 INFO NetworkTopology: Adding a new node: /default/192.168.1.63:50010

16/04/27 21:46:35 INFO NetworkTopology: Adding a new node: /default/192.168.1.62:50010

res7: Int = 2

返回第一分區所在服務器

scala> hadoopRDD.preferredLocations(hadoopRDD.partitions(0))

res9: Seq[String] = WrappedArray(yarn4, yarn3, yarn2)

2.3.RDD依賴關係

由於RDD是粗粒度的操作數據集,每一個轉換操作都會生成一個新的RDD,所以RDD之間會形成類似於流水線一樣的前後依賴關係,

有兩種類型依賴: 窄依賴和寬依賴。

窄依賴:一個父RDD分區最多隻被子RDD的一個分區所使用

寬依賴:多個子RDD的分區依賴於同一個父RDD的分區。

寬依賴通常是Spark拆分Stage的邊界,在同一個Stage內均為窄依賴。

  • 窄依賴:

每一個父RDD的分區最多隻被子RDD的一個分區所使用,如圖所示:

Spark 彈性分佈式數據集(RDD)

  • 寬依賴

多個子RDD的分區會依賴同一個父RDD的分區,如圖所示:

Spark 彈性分佈式數據集(RDD)

使用 RDD.toDebugString 可以看到整個 logical plan (RDD 的數據依賴關係).

區分兩種依賴關係原因:

org.apache.spark.OneToOneDependency(窄依賴)

org.apache.spark.ShuffleDependency(寬依賴)

scala> val rdd = sc.makeRDD(1 to 10 )

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :12

#map是對RDD中的每個元素都執行一個指定的函數來產生一個新的RDD。任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。

#將數據轉成K-V格式.

scala> val mapRDD = rdd.map(x => (x,x))

scala> mapRDD.dependencies

res14: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@22c3119a)

scala> val shuffleRDD = mapRDD.partitionBy(new org.apache.spark.HashPartitioner(3))

shuffleRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[7] at partitionBy at :16

scala> shuffleRDD.dependencies

res16: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@5a6bded6)

2.4.RDD分區計算(compute)

spark中每個RDD計算都是以分區為單位,而RDD中的compute函數都是在對迭代器進行復合,不需要保存每次的計算結果。

compute函數只返回相應分區數據的迭代器.

基於RDD的每一個分區,執行compute操作。

對於HadoopRDD來說,compute中就是從HDFS讀取分區中數據塊信息。

對於JdbcRDD來說,就是連接數據庫,執行查詢,讀取每一條數據。

scala> val rdd = sc.parallelize(1 to 10 ,2)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at :12

scala> val map_rdd = rdd.map(a => a+ 1)

map_rdd: org.apache.spark.rdd.RDD[Int] = MappedRDD[9] at map at :14

scala> val filter_rdd = map_rdd.filter(a => (a >3))

filter_rdd: org.apache.spark.rdd.RDD[Int] = FilteredRDD[10] at filter at :16

scala> val context = new org.apache.spark.TaskContext(0,0,0)

context: org.apache.spark.TaskContext = org.apache.spark.TaskContext@7fc24284

#返回分區1的迭代器

scala> val iter0 = filter_rdd.compute(filter_rdd.partitions(0),context)

iter0: Iterator[Int] = non-empty iterator

scala> iter0.toList

res17: List[Int] = List(4, 5, 6)

#返回分區2的迭代器

scala> val iter1 = filter_rdd.compute(filter_rdd.partitions(1),context)

iter1: Iterator[Int] = non-empty iterator

scala> iter1.toList

res18: List[Int] = List(7, 8, 9, 10, 11)

2.5.RDD分區函數(partitioner)

partitioner就是spark的分區函數,spark實現兩種分區函數: HashPatition 和RangePatitoner,且partitioner屬性只存在於K-V類型的RDD中,對於非K-V類型的partitioner的值是None。

分區函數決定了rdd本身的分區數量,也可作為其父rddshuffle輸出中每個分區進行數據分割的依據。

#HashPartitioner案例:

scala> val rdd = sc.makeRDD(1 to 10, 2).map(x => (x,x))

rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MappedRDD[1] at map at :12

scala> rdd.partitioner

res0: Option[org.apache.spark.Partitioner] = None

#最終被分為3個分區,根據key % 3 分到不同分區

scala> val group_rdd = rdd.groupByKey(new org.apache.spark.HashPartitioner(3))

scala> group_rdd.partitioner

res1: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@796d413c)

#行動操作,查看每個分區內的值

scala> group_rdd.collectPartitions()

Spark 彈性分佈式數據集(RDD)

3.RDD屬性:

通過RDD的內部屬性,可以獲取相應的元數據信息,通過這些元數據信息可以支持更負責的算法或優化。

分區列表: 通過分區列表可以找到一個RDD中包含的所有分區及其所在地址。

計算每個分片的函數:

對父RDD的依賴列表:

對key-value對數據類型RDD的分區器:控制分區策略和分區數。

每個數據分區的地址列表: 如果數據有副本,通過地址列表可以獲取單個數據塊的所有副本地址,為負載均衡和容錯提供支持。

4.spark計算工作流

spark工作流: 輸入,運行轉換,輸出。

在運行轉換中通過算子對RDD進行轉換。算子是RDD中定義的函數,可以對RDD中的數據進行轉換和操作。

4.1.輸入:

在spark程序運行中,數據從外部數據空間輸入到spark,數據就進入了spark運行時數據空間,會轉化為spark中的數據塊,通過blockManager進行管理。

4.2.運行轉換:

在spark數據輸入形成RDD後,變可以通過轉換算子等,對數據操作並將RDD轉化成新的RDD,通過運行算子,觸發spark提交作業。如果數據需要複用,可以通過cache算子,將數據緩存到內存。

4.3.輸出:

程序運行結束數據會輸出spark運行時空間,存儲到分佈式存儲中(saveAsTextFile輸出到HDFS)或scala數據或集合中(collect 輸出到scala集合,count返回scala int型數據)。

5.RDD分區說明

RDD作為一個分佈式的數據集,是分佈在多個worker節點上的。如下圖所示,RDD1有五個分區(partition),他們分佈在了四個worker nodes 上面,RDD2有三個分區,分佈在了三個worker nodes上面。

Spark 彈性分佈式數據集(RDD)

想要重新給rdd分區,直接調用rdd.repartition方法就可以了,如果想具體控制哪些數據分佈在哪些分區上,可以傳一個Ordering進去。比如說,我想要數據隨機地分佈成10個分區,可以:

class MyOrdering[T] extends Ordering[T]{ def compare(x:T,y:T) = math.random compare math.random}// 假設數據是Int類型的rdd.repartition(10)(new MyOrdering[Int]) 

6.RDD操作

rdd有兩種類型操作: transformations 和actions

transformations :轉換RDD

action 操作:運行計算返回結果給驅動程序。

transformation 是懶操作,只有當action需要給driver 程序返回結果時才執行。


分享到:


相關文章: