1.RDD簡介
RDD彈性分佈式數據集,一個RDD代表一個被分區的只讀數據集。
RDD是spark的核心數據結構。通過RDD的依賴關係形成spark的調度順序,通過對RDD的操作形成整個spark程序。
RDD的本質:在並行計算的各個階段進行有效的數據共享(減少網絡之間的文件傳輸代價)。
1.1.RDD的容錯處理
由於RDD的接口只支持粗粒度操作(即一個操作會被應用在RDD的所有數據上),所有隻要通過記錄下這些作用在RDD之上的轉換操作,來構建rdd的繼承關係(lineage),就可以有效的進行容錯處理。
1.2.RDD的生成
RDD的生成只有三種途徑:
1.從內存集合
2.外部存儲系統,hdfs(hive ,Cassandra, hbase)
默認 spark為每個hdfs的數據塊創建一個分區,也可以設置較大分區數, 但是不能設置分區數小於數據塊的數量。
使用sparkContext.wholeTextFiles讀取一個包含很多小文件的目錄,返回(文件名,內容)對,相對於textFile 返回文件中每一行作為一條記錄。
為了SequenceFiles文件,使用sc.sequenceFile[K,V],k v是文件中k,v的數據類型。
為了hadoop其他的輸入格式,可以使用sc.hadoopRDD。
一個partition 對應一個task,一個task 必定存在於一個Executor,一個Executor 對應一個JVM.
3.通過轉換操作來自於其他的RDD ,如map,filter,join等。
2.spark RDD接口
partition 分區,一個RDD會有一個或者多個分區
preferredLocations(p) 對於分區p,返回數據本地化計算的節點
dependencies:RDD依賴關係
compute(p,context): 對於分區p,進行迭代計算
partitioner:RDD分區函數
2.1.RDD分區(partitions)
一個RDD包含一個或多個分區,每個分區都有分區屬性,分區的多少決定了對RDD進行並行計算的並行度。
分區決定並行計算的粒度。每個rdd分區的計算操作都在一個單獨的任務中被執行。
一個partition 對應一個task,一個task 必定存在於一個Executor,一個Executor 對應一個JVM.
利用rdd的成員變量partitions返回的數組大小查詢RDD分區數。
#數組轉換成分區,第二個參數是指定分區數
scala> val rdd =sc.parallelize(1 to 100,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at
#返回分區數
scala> rdd.partitions.size
res2: Int = 2
#查看默認分區數 ()
scala> val rdd2 = sc.parallelize(1 to 100)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at
scala> rdd.partitions.size
res5: Int = 2
2.2.RDD優先位置(preferredLocations)
返回此RDD的每個分區所存儲的位置。(用於在spark進行任務調度的時候,儘可能將任務分配到數據塊所存儲的位置)
依從hadoop讀取數據生成RDD為例:preferredLocations返回每個數據塊所在的機器名或IP地址,如果每一塊數據是多份存儲,則返回多個機器地址。
#讀取hdfs生成一個類型為MappedRDD的RDD (一個文件生成一個RDD).
scala> val rdd = sc.textFile("hdfs://yarn1:8020/hmbbs_logs/access_2013_05_31.log")
16/04/27 21:45:41 INFO MemoryStore: ensureFreeSpace(219256) called with curMem=0, maxMem=311387750
16/04/27 21:45:41 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 214.1 KB, free 296.8 MB)
rdd: org.apache.spark.rdd.RDD[String] = MappedRDD[4] at textFile at
#通過依賴關係找到原始的rdd
scala> val hadoopRDD = rdd.dependencies(0).rdd
hadoopRDD: org.apache.spark.rdd.RDD[_] = HadoopRDD[3] at textFile at
#hadoopRDD分區個數
scala> hadoopRDD.partitions.size
16/04/27 21:46:35 INFO FileInputFormat: Total input paths to process : 1
16/04/27 21:46:35 INFO NetworkTopology: Adding a new node: /default/192.168.1.64:50010
16/04/27 21:46:35 INFO NetworkTopology: Adding a new node: /default/192.168.1.63:50010
16/04/27 21:46:35 INFO NetworkTopology: Adding a new node: /default/192.168.1.62:50010
res7: Int = 2
返回第一分區所在服務器
scala> hadoopRDD.preferredLocations(hadoopRDD.partitions(0))
res9: Seq[String] = WrappedArray(yarn4, yarn3, yarn2)
2.3.RDD依賴關係
由於RDD是粗粒度的操作數據集,每一個轉換操作都會生成一個新的RDD,所以RDD之間會形成類似於流水線一樣的前後依賴關係,
有兩種類型依賴: 窄依賴和寬依賴。
窄依賴:一個父RDD分區最多隻被子RDD的一個分區所使用
寬依賴:多個子RDD的分區依賴於同一個父RDD的分區。
寬依賴通常是Spark拆分Stage的邊界,在同一個Stage內均為窄依賴。
窄依賴:
每一個父RDD的分區最多隻被子RDD的一個分區所使用,如圖所示:
寬依賴
多個子RDD的分區會依賴同一個父RDD的分區,如圖所示:
使用 RDD.toDebugString
可以看到整個 logical plan (RDD 的數據依賴關係).
區分兩種依賴關係原因:
org.apache.spark.OneToOneDependency(窄依賴)
org.apache.spark.ShuffleDependency(寬依賴)
scala> val rdd = sc.makeRDD(1 to 10 )
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at
#map是對RDD中的每個元素都執行一個指定的函數來產生一個新的RDD。任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。
#將數據轉成K-V格式.
scala> val mapRDD = rdd.map(x => (x,x))
scala> mapRDD.dependencies
res14: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@22c3119a)
scala> val shuffleRDD = mapRDD.partitionBy(new org.apache.spark.HashPartitioner(3))
shuffleRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[7] at partitionBy at
scala> shuffleRDD.dependencies
res16: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@5a6bded6)
2.4.RDD分區計算(compute)
spark中每個RDD計算都是以分區為單位,而RDD中的compute函數都是在對迭代器進行復合,不需要保存每次的計算結果。
compute函數只返回相應分區數據的迭代器.
基於RDD的每一個分區,執行compute操作。
對於HadoopRDD來說,compute中就是從HDFS讀取分區中數據塊信息。
對於JdbcRDD來說,就是連接數據庫,執行查詢,讀取每一條數據。
scala> val rdd = sc.parallelize(1 to 10 ,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at
scala> val map_rdd = rdd.map(a => a+ 1)
map_rdd: org.apache.spark.rdd.RDD[Int] = MappedRDD[9] at map at
scala> val filter_rdd = map_rdd.filter(a => (a >3))
filter_rdd: org.apache.spark.rdd.RDD[Int] = FilteredRDD[10] at filter at
scala> val context = new org.apache.spark.TaskContext(0,0,0)
context: org.apache.spark.TaskContext = org.apache.spark.TaskContext@7fc24284
#返回分區1的迭代器
scala> val iter0 = filter_rdd.compute(filter_rdd.partitions(0),context)
iter0: Iterator[Int] = non-empty iterator
scala> iter0.toList
res17: List[Int] = List(4, 5, 6)
#返回分區2的迭代器
scala> val iter1 = filter_rdd.compute(filter_rdd.partitions(1),context)
iter1: Iterator[Int] = non-empty iterator
scala> iter1.toList
res18: List[Int] = List(7, 8, 9, 10, 11)
2.5.RDD分區函數(partitioner)
partitioner就是spark的分區函數,spark實現兩種分區函數: HashPatition 和RangePatitoner,且partitioner屬性只存在於K-V類型的RDD中,對於非K-V類型的partitioner的值是None。
分區函數決定了rdd本身的分區數量,也可作為其父rddshuffle輸出中每個分區進行數據分割的依據。
#HashPartitioner案例:
scala> val rdd = sc.makeRDD(1 to 10, 2).map(x => (x,x))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MappedRDD[1] at map at
scala> rdd.partitioner
res0: Option[org.apache.spark.Partitioner] = None
#最終被分為3個分區,根據key % 3 分到不同分區
scala> val group_rdd = rdd.groupByKey(new org.apache.spark.HashPartitioner(3))
scala> group_rdd.partitioner
res1: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@796d413c)
#行動操作,查看每個分區內的值
scala> group_rdd.collectPartitions()
3.RDD屬性:
通過RDD的內部屬性,可以獲取相應的元數據信息,通過這些元數據信息可以支持更負責的算法或優化。
分區列表: 通過分區列表可以找到一個RDD中包含的所有分區及其所在地址。
計算每個分片的函數:
對父RDD的依賴列表:
對key-value對數據類型RDD的分區器:控制分區策略和分區數。
每個數據分區的地址列表: 如果數據有副本,通過地址列表可以獲取單個數據塊的所有副本地址,為負載均衡和容錯提供支持。
4.spark計算工作流
spark工作流: 輸入,運行轉換,輸出。
在運行轉換中通過算子對RDD進行轉換。算子是RDD中定義的函數,可以對RDD中的數據進行轉換和操作。
4.1.輸入:
在spark程序運行中,數據從外部數據空間輸入到spark,數據就進入了spark運行時數據空間,會轉化為spark中的數據塊,通過blockManager進行管理。
4.2.運行轉換:
在spark數據輸入形成RDD後,變可以通過轉換算子等,對數據操作並將RDD轉化成新的RDD,通過運行算子,觸發spark提交作業。如果數據需要複用,可以通過cache算子,將數據緩存到內存。
4.3.輸出:
程序運行結束數據會輸出spark運行時空間,存儲到分佈式存儲中(saveAsTextFile輸出到HDFS)或scala數據或集合中(collect 輸出到scala集合,count返回scala int型數據)。
5.RDD分區說明
RDD作為一個分佈式的數據集,是分佈在多個worker節點上的。如下圖所示,RDD1有五個分區(partition),他們分佈在了四個worker nodes 上面,RDD2有三個分區,分佈在了三個worker nodes上面。
想要重新給rdd分區,直接調用rdd.repartition方法就可以了,如果想具體控制哪些數據分佈在哪些分區上,可以傳一個Ordering進去。比如說,我想要數據隨機地分佈成10個分區,可以:
class MyOrdering[T] extends Ordering[T]{ def compare(x:T,y:T) = math.random compare math.random}// 假設數據是Int類型的rdd.repartition(10)(new MyOrdering[Int])
6.RDD操作
rdd有兩種類型操作: transformations 和actions
transformations :轉換RDD
action 操作:運行計算返回結果給驅動程序。
transformation 是懶操作,只有當action需要給driver 程序返回結果時才執行。
閱讀更多 從大數據說起 的文章