Spark,從入門到精通

歡迎閱讀美圖數據技術團隊的「Spark,從入門到精通」系列文章,本系列文章將由淺入深為大家介紹 Spark,從框架入門到底層架構的實現,相信總有一種姿勢適合你,歡迎大家持續關注:)

/ 什麼是 RDD? /

傳統的 MapReduce 雖然具有自動容錯、平衡負載和可拓展性的優點,但是其最大缺點是在迭代計算式的時候,要進行大量的磁盤 IO 操作,而 RDD 正是解決這一缺點的抽象方法。RDD(Resilient Distributed Datasets)即彈性分佈式數據集,從名字說起:

彈性

當計算過程中內存不足時可刷寫到磁盤等外存上,可與外存做靈活的數據交換;

RDD 使用了一種“血統”的容錯機制,在結構更新和丟失後可隨時根據血統進行數據模型的重建;

分佈式

就是可以分佈在多臺機器上進行並行計算;

數據集

一組只讀的、可分區的分佈式數據集合,集合內包含了多個分區。分區依照特定規則將具有相同屬性的數據記錄放在一起,每個分區相當於一個數據集片段。

RDD 內部結構


RDD原理與基本操作 | Spark,從入門到精通


圖 1


圖 1 所示是 RDD 的內部結構圖,它是一個只讀、有屬性的數據集。它的屬性用來描述當前數據集的狀態,數據集由數據的分區(partition)組成,並由(block)映射成真實數據。RDD 的主要屬性可以分為 3 類:與其他 RDD 的關係(parents、dependencies);數據(partitioner、checkpoint、storage level、iterator 等);RDD 自身屬性(sparkcontext、sparkconf),接下來我們根據屬性分類來深入介紹各個組件。

RDD 自身屬性

從自身屬性說起,SparkContext 是 Spark job 的入口,由 Driver 創建在 client 端,包括集群連接、RDD ID、累加器、廣播變量等信息。SparkConf 是參數配置信息,包括:

  • Spark api,控制大部分的應用程序參數;
  • 環境變量,配置IP地址、端口等信息;
  • 日誌配置,通過 log4j.properties 配置。


數據

RDD 內部的數據集合在邏輯上和物理上被劃分成多個小子集合,這樣的每一個子集合我們將其稱為分區(Partitions),分區的個數會決定並行計算的粒度,而每一個分區數值的計算都是在一個單獨的任務中進行的,因此並行任務的個數也是由 RDD分區的個數決定的。但事實上 RDD 只是數據集的抽象,分區內部並不會存儲具體的數據。Partition 類內包含一個 index 成員,表示該分區在 RDD 內的編號,通過 RDD 編號+分區編號可以確定該分區對應的唯一塊編號,再利用底層數據存儲層提供的接口就能從存儲介質(如:HDFS、Memory)中提取出分區對應的數據。

RDD 的分區方式主要包含兩種:Hash Partitioner 和 Range Partitioner,這兩種分區類型都是針對 Key-Value 類型的數據,如是非 Key-Value 類型則分區函數為 None。Hash 是以 Key 作為分區條件的散列分佈,分區數據不連續,極端情況也可能散列到少數幾個分區上導致數據不均等;Range 按 Key 的排序平衡分佈,分區內數據連續,大小也相對均等。

Preferred Location 是一個列表,用於存儲每個 Partition 的優先位置。對於每個 HDFS 文件來說,這個列表保存的是每個 Partition 所在的塊的位置,也就是該文件的「劃分點」。

Storage Level 是 RDD 持久化的存儲級別,RDD 持久化可以調用兩種方法:cache 和 persist:persist 方法可以自由的設置存儲級別,默認是持久化到內存;cache 方法是將 RDD 持久化到內存,cache 的內部實際上是調用了persist 方法,由於沒有開放存儲級別的參數設置,所以是直接持久化到內存。


RDD原理與基本操作 | Spark,從入門到精通


圖 2


如圖 2 所示是 Storage Level 各級別分佈,那麼如何選擇一種最合適的持久化策略呢?默認情況下,性能最高的當然是 MEMORY_ONLY,但前提是你的內存必須足夠大到可以綽綽有餘地存放下整個 RDD 的所有數據。因為不進行序列化與反序列化操作,就避免了這部分的性能開銷;對這個RDD的後續算子操作,都是基於純內存中的數據的操作,不需要從磁盤文件中讀取數據,性能也很高;而且不需要複製一份數據副本,並遠程傳送到其他節點上。但是這裡必須要注意的是,在實際的生產環境中,恐怕能夠直接用這種策略的場景還是有限的,如果 RDD 中數據比較多時(比如幾十億),直接用這種持久化級別,會導致 JVM 的 OOM 內存溢出異常。

如果使用 MEMORY_ONLY 級別時發生了內存溢出,那麼建議嘗試使用 MEMORY_ONLY_SER 級別。該級別會將 RDD 數據序列化後再保存在內存中,此時每個 partition 僅僅是一個字節數組而已,大大減少了對象數量,並降低了內存佔用。這種級別比 MEMORY_ONLY 多出來的性能開銷主要就是序列化與反序列化的開銷,但是後續算子可以基於純內存進行操作,因此性能總體還是比較高的。但可能發生 OOM 內存溢出的異常。

如果純內存的級別都無法使用,那麼建議使用 MEMORY_AND_DISK_SER 策略,而不是 MEMORY_AND_DISK 策略。因為既然到了這一步,就說明 RDD 的數據量很大,內存無法完全放下,序列化後的數據比較少,可以節省內存和磁盤的空間開銷。同時該策略會優先儘量嘗試將數據緩存在內存中,內存緩存不下才會寫入磁盤。

通常不建議使用 DISK_ONLY 和後綴為_2 的級別:因為完全基於磁盤文件進行數據的讀寫,會導致性能急劇降低。後綴為_2的級別,必須將所有數據都複製一份副本,併發送到其他節點上,數據複製以及網絡傳輸會導致較大的性能開銷。

Checkpoint 是 Spark 提供的一種緩存機制,當需要計算依賴鏈非常長又想避免重新計算之前的 RDD 時,可以對 RDD 做 Checkpoint 處理,檢查 RDD 是否被物化或計算,並將結果持久化到磁盤或 HDFS 內。Checkpoint 會把當前 RDD 保存到一個目錄,要觸發 action 操作的時候它才會執行。在 Checkpoint 應該先做持久化(persist 或者 cache)操作,否則就要重新計算一遍。若某個 RDD 成功執行 checkpoint,它前面的所有依賴鏈會被銷燬。

與 Spark 提供的另一種緩存機制 cache 相比:cache 緩存數據由 executor 管理,若 executor 消失,它的數據將被清除,RDD 需要重新計算;而 checkpoint 將數據保存到磁盤或 HDFS 內,job 可以從 checkpoint 點繼續計算。Spark 提供了 rdd.persist(StorageLevel.DISK_ONLY) 這樣的方法,相當於 cache 到磁盤上,這樣可以使 RDD 第一次被計算得到時就存儲到磁盤上,它們之間的區別在於:persist 雖然可以將 RDD 的 partition 持久化到磁盤,但一旦作業執行結束,被 cache 到磁盤上的 RDD 會被清空;而 checkpoint 將 RDD 持久化到 HDFS 或本地文件夾,如果不被手動 remove 掉,是一直存在的。

Compute 函數實現方式就是向上遞歸「獲取父 RDD 分區數據進行計算」,直到遇到檢查點 RDD 獲取有緩存的 RDD。

Iterator 用來查找當前 RDD Partition 與父 RDD 中 Partition 的血緣關係,並通過 Storage Level 確定迭代位置,直到確定真實數據的位置。它的實現流程如下:

  • 若標記了有緩存,則取緩存,取不到則進行 computeOrReadCheckpoint(計算或讀檢查點)。完了再存入緩存,以備後續使用。
  • 若未標記有緩存,則直接進行 computeOrReadCheckpoint。
  • computeOrReadCheckpoint 這個過程也做兩個判斷:有做過 checkpoint 和沒有做過 checkpoint,做過 checkpoint 則可以讀取到檢查點數據返回,沒做過則調該 RDD 的實現類的 compute 函數計算。


血統關係

一個作業從開始到結束的計算過程中產生了多個 RDD,RDD 之間是彼此相互依賴的,我們把這種父子依賴的關係稱之為「血統」。

RDD 只支持粗顆粒變換,即只記錄單個塊(分區)上執行的單個操作,然後創建某個 RDD 的變換序列(血統 lineage)存儲下來。

*變換序列指每個 RDD 都包含了它是如何由其他 RDD 變換過來的以及如何重建某一塊數據的信息。

因此 RDD 的容錯機制又稱「血統」容錯。 要實現這種「血統」容錯機制,最大的難題就是如何表達父 RDD 和子 RDD 之間的依賴關係。


RDD原理與基本操作 | Spark,從入門到精通


圖 3


如圖 3 所示,父 RDD 的每個分區最多隻能被子 RDD 的一個分區使用,稱為窄依賴(narrow dependency);若父 RDD 的每個分區可以被子 RDD 的多個分區使用,稱為寬依賴(wide dependency)。簡單來講,窄依賴就是父子RDD分區間「一對一」的關係,而寬依賴就是「一對多」關係。從失敗恢復來看,窄依賴的失敗恢復起來更高效,因為它只需找到父 RDD 的一個對應分區即可,而且可以在不同節點上並行計算做恢復;寬依賴牽涉到父 RDD 的多個分區,需要得到所有依賴的父 RDD 分區的 shuffle 結果,恢復起來相對複雜些。


RDD原理與基本操作 | Spark,從入門到精通


圖 4


根據 RDD 之間的寬窄依賴關係引申出 Stage 的概念,Stage 是由一組 RDD 組成的執行計劃。如果 RDD 的衍生關係都是窄依賴,則可放在同一個 Stage 中運行,若 RDD 的依賴關係為寬依賴,則要劃分到不同的 Stage。這樣 Spark 在執行作業時,會按照 Stage 的劃分, 生成一個最優、完整的執行計劃。

/ RDD 的創建方式與分區機制 /


RDD 的創建方式

RDD 的創建方式有四種:

1.使用程序中的集合創建 RDD,RDD 的數據源是程序中的集合,通過 parallelize 或者 makeRDD 將集合轉化為 RDD;

*例

val num = Array(1,2,3,4,5)
val rdd = sc.parallelize(num)


2.使用本地文件或 HDFS 創建 RDD,RDD 的數據源是本地文件系統或 HDFS 的數據,使用 textFile 方法創建RDD。

*例

val rdd = sc.textFile(“hdfs://master:9000/rec/data”)


3.使用數據流創建 RDD,使用 Spark Streaming 的相關類,接收實時的輸入數據流創建 RDD(數據流來源可以是 kafka、flume 等)。

*例

val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream(“localhost”, 9999)
val words = lines.flatMap(_.split(“ ”))


4.使用其他方式創建 RDD,從其他數據庫上創建 RDD,例如 Hbase、MySQL 等。

*例

val sqlContext = new SQLContext(sc)
val url = "jdbc:mysql://ip:port/xxxx"
val prop = new Properties()
val df = sqlContext.read.jdbc(url, “play_time”, prop)


RDD 的分區機制

RDD 的分區機制有兩個關鍵點:一個是關鍵參數,即 Spark 的默認併發數 spark.default.parallelism;另一個是關鍵原則,RDD 分區儘可能使得分區的個數等於集群核心數目。

當配置文件 spark-default.conf 中顯式配置了 spark.default.parallelism,那麼 spark.default.parallelism=配置的值,否則按照如下規則進行取值:

1.本地模式(不會啟動 executor,由 SparkSubmit 進程生成指定數量的線程數來併發)

spark-shell spark.default.parallelism = 1spark-shell --master local[N] spark.default.parallelism = N (使用 N 個核)spark-shell --master local spark.default.parallelism = 1

2.偽集群模式(x 為本機上啟動的 executor 數,y 為每個 executor 使用的 core 數,z 為每個 executor 使用的內存)

spark-shell --master local-cluster[x,y,z] spark.default.parallelism = x * y

3.Yarn、standalone 等模式

spark.default.parallelism = max(所有 executor 使用的 core 總數,2)

4.Mesos

spark.default.parallelism = 8


spark.context 會生成兩個參數,由 spark.default.parallelism 推導出這兩個參數的值:

sc.defaultParallelism = spark.default.parallelism
sc.defaultMinPartitions = min(spark.default.parallelism, 2)


當 sc.defaultParallelism 和 sc.defaultMinPartitions 確認後,就可以推算 RDD 的分區數了。

  • 以 parallelize 方法為例
val rdd = sc.parallelize(1 to 10)

如果使用 parallelize 方法時沒指定分區數, RDD 的分區數 = sc.defaultParallelism

  • 以 textFile 方法為例
val rdd = sc.textFile(“path/file”)


分區機制分兩種情況:

1.從本地文件生成的 RDD,如果沒有指定分區數,則默認分區數規則為

rdd 的分區數 = max(本地 file 的分片數, sc.defaultMinPartitions)

2.從 HDFS 生成的 RDD,如果沒有指定分區數,則默認分區數規則為:

rdd 的分區數 = max(hdfs 文件的 block 數目, sc.defaultMinPartitions)


/ RDD 的常用操作 /


RDD 支持兩種類型的操作:轉換(Transformation)和動作(Action),轉換操作是從已經存在的數據集中創建一個新的數據集,而動作操作是在數據集上進行計算後返回結果到 Driver,既觸發 SparkContext 提交 Job 作業。轉換操作都具有 Lazy 特性,即 Spark 不會立刻進行實際的計算,只會記錄執行的軌跡,只有觸發行動操作的時候,它才會根據 DAG 圖真正執行。

轉換與動作具體包含的操作種類如下圖所示:


RDD原理與基本操作 | Spark,從入門到精通


圖 5:轉換操作

RDD原理與基本操作 | Spark,從入門到精通


圖 6:動作操作


最後我們通過一段代碼來看看它具體的操作:

RDD原理與基本操作 | Spark,從入門到精通


這段代碼是用來計算某個視頻被男性或女性用戶的播放次數,其中 rdd_attr 用來記錄用戶性別,rdd_src 是用戶對某個視頻進行播放的記錄,這兩個 RDD 會進行一個 join 操作,比如這是某個男性用戶對某個視頻進行了播放,進行 map 操作之後得到視頻 id 和性別作為 key,根據這個 key 做 reduceByKey 的操作,最終得到一個視頻被男性/女性用戶總共播放了多少次的 RDD,然後使用 combineByKey 合併同一個視頻 id 的多個結果,最後保存到 HDFS 上。

附:參考文章

《Spark之深入理解RDD結構》https://blog.csdn.net/u011094454/article/details/78992293《RDD的數據結構模型》https://www.jianshu.com/p/dd7c7243e7f9?from=singlemessage《Spark RDD詳解》https://blog.csdn.net/wangxiaotongfan/article/details/51395769《Spark RDD的默認分區數:(spark 2.1.0)》https://www.jianshu.com/p/4b7d07e754fa《Spark性能優化指南——基礎篇》https://tech.meituan.com/spark_tuning_basic.html


分享到:


相關文章: