Hello Spark!

歡迎閱讀美圖數據技術團隊的「Spark,從入門到精通」系列文章,本系列文章將由淺入深為大家介紹 Spark,從框架入門到底層架構的實現,相信總有一種姿勢適合你,歡迎大家持續關注:)

/ 什麼是 Spark? /


Spark 是 UC Berkeley AMP lab 所開源的類 Hadoop MapReduce 的通用並行框架,是專為大規模數據處理而設計的快速通用的大數據處理引擎及輕量級的大數據處理統一平臺。

當我們在談 Spark 的時候可能是指一個 Spark 應用程序,替代 MapReduce 運行在 Yarn上,存儲在 HDFS 上的一個大數據批處理程序;也可能是指使用包含 Spark sql、Spark streaming 等子項目;甚至 Tachyon、Mesos 等大數據處理的統一平臺,或者稱為 Spark 生態。


Hello Spark! | Spark,從入門到精通


圖 1


發展至今,Spark 已不僅僅是 MapReduce 的替換方案,它已經發出成了一個包含眾多子項目的 Spark 生態。如圖 1 所示,Spark 生態可分為四層:

  • 數據存儲層,以 HDFS 、Tachyon 為代表的一些分佈式文件存儲系統或各種數據庫;
  • 資源管理層,Yarn、Mesos 等資源管理器;
  • 數據處理引擎;
  • 應用層,以 Spark 為基礎產生的眾多項目;


Spark SQL 提供 HiveQL(通過 Apache Hive 的 SQL 變體 Hive 查詢語言)與Spark 進行交互的 API。每個數據庫表被當做一個 RDD,Spark SQL 查詢被轉換為 Spark 操作。Spark Streaming 對實時數據流進行處理和控制,它允許程序能夠像普通 RDD 一樣處理實時數據。

接下來的系列文章將會詳細介紹 Spark 生態中的其他模塊與各個子項目,接下來將通過與 MapReduce 的對比來介紹數據處理引擎Spark的特點及其原理。

/ Spark 的特點 /


根據谷歌和百度的搜索結果顯示,Spark 的搜索趨勢已與 Hadoop 持平甚至趕超,標誌著 Spark 已經成為計算部分的事實標準,也就是說大數據技術繞不開 Spark 了。

在大數據的存儲、計算、資源調度中,Spark 主要解決計算問題,即主要替代 Mapreduce 的功能,底層存儲和資源調度很多公司仍然選擇使用 HDFS、Yarn 來承載。為什麼眾多企業在 Hadoop 生態框架裡都選擇用 Spark 作為處理引擎?讓我們仔細看看它有什麼特點。

1.速度快。Spark 基於內存進行計算( 也有部分計算基於磁盤) ;

2.容易上手開發。 Spark 基於 RDD 的計算模型, 比 Hadoop 基於 Map-Reduce 的計算模型要更易於理解、易於上手開發實現各種複雜功能,如二次排序、 topN 等複雜操作時更加便捷。;

3.超強的通用性。 Spark 提供了 Spark RDD、 Spark SQL、 Spark Streaming、 Spark MLlib、 Spark GraphX 等技術組件, 可以一站式地完成大數據領域的離線批處理、 交互式查詢、 流式計算、 機器學習、圖計算等常見的任務;

4.集成 Hadoop。 Spark 可以完美集成 Hadoop。 Hadoop 的 HDFS、 Hive、HBase 負責存儲, Yarn 負責資源調度, Spark 負責大數據計算是比較流行的大數據解決方案。

4.極高的活躍度。 Spark 目前是 Apache 基金會的頂級項目, 全世界有大量的優秀工程師是 Spark 的 committer, 並且世界上很多頂級的 IT 公司都在大規模地使用Spark。

看看同樣是負責計算問題的 MapReduce,如圖 2 所示是 MapReduce 計算 WordCount。


Hello Spark! | Spark,從入門到精通


圖 2


MapReduce 解決了大數據處理中多種場景問題,但是它的侷限性也很明顯:

  • MapReduce 只提供 Map 和 Reduce 兩個操作,欠缺表達力,複雜的計算需要大量的 Job 才能完成。
  • 中間結果也放在 HDFS 文件系統中,迭代計算的話效率很低。
  • 適用 Batch 數據處理,對於交互式數據處理而言實時數據處理的支持不夠。
  • 需要寫很多底層代碼,難上手。如上所示的 WordCount 程序至少需要三個 java 類:Map 類、Reduce 類、Job 類,這裡不詳細列出。


許多項目針對它的侷限性進行了改進(如 Tez 等),接著看圖 3 中 Spark 的具體操作流程:


Hello Spark! | Spark,從入門到精通


圖 3

首先我們可以看到 Spark 提供了豐富的算子(textFile、FlatMap、Map、ReduceByKey 等),在計算的中間結果也沒有存儲到 HDFS 的操作。然後,對於上圖的 WordCount 程序,Spark 只需要如下一行代碼:

sc.textFile(s"${path}").flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).saveAsTextFile("hdfs://xxx")


圖 4 列舉了 Spark 和 MapReduce 作為數據處理引擎的一些對比。值得一提的是關於數據處理的規模,Spark 在誕生後,社區裡有很多質疑 Spark 處理數據規模的聲音,隨後官方給出了對於一 PB 數據排序的實驗,並且處理時間打破了當時的記錄。但我們也不能忽視,在實際生產過程中,我們面對的不是一個程序或者一個任務,在同一個集群,如果有很多的 Spark 程序沒有得到很好的優化,會浪費大量的內存,從而讓一些程序需要排隊等待,在這種情況下,Spark 處理的數據規模可能會小於 MapReduce 處理的數據規模。(之後的系列文章也會介紹關於 Spark 內存調優的相關內容)


Hello Spark! | Spark,從入門到精通


圖 4


關於最後一點容錯性,MapReduce 中每一步操作的結果都會被存入磁盤,在計算出現錯誤時可以很好的從磁盤進行恢復;Spark 則需要根據 RDD 中的信息進行數據的重新計算,會耗費一定的資源。Spark 提供兩種方式進行故障恢復:通過數據的血緣關係再執行一遍前面的處理;Checkpoint 將數據集存儲到持久存儲中。理論上如果選擇在每個完成的小步驟上加 CheckPoint,Spark 的容錯性能可以和 MR 達到一樣的穩健。當然,很少有人會這麼做。

我們通過 Spark 與 MapReduce 對比。看到了 Spark 對 MapReduce 侷限性的改進,還有它快速、通用的特點。接下來將通過 Spark 的設計思想和執行過程來說明它為什麼可以做到這些特點。

/ Spark 的基本原理 /

Hello Spark! | Spark,從入門到精通


圖 5


如圖 5 所示,在 Spark 集群中由一個節點作為 driver 端創建 SparkContext。Spark 應用程序的入口負責調度各個運算資源,協調各個 Worker Node上 的 Executor。根據用戶輸入的參數會產生若干個 workr,workr 節點運行若干個 executor,一個 executor 是一個進程,運行各自的 task,每個 task 執行相同的代碼段處理不同的數據。


Hello Spark! | Spark,從入門到精通


圖 6

如圖 6 所示是 Spark 的具體執行過程,client 提交作業,通過反射 invoke 執行用戶代碼 main 函數,之後開始啟動 CoarseGrainedExecutorBackend 和初始化 SparkContext。

*SparkContext 初始化包括初始化監控頁面 SparkUI、執行環境 SparkEnv、安全管理器 SecurityManager、stage 劃分及調度器 DAGScheduler、task 作業調度器 TaskSchedulerImpl 、與 Executor 通信的調度端 CoarseGrainedSchedulerBackend。

DAG Scheduler 將作業劃分後,依次提交 stage 對應的 taskSet 給 TaskSchedulerImpl,TaskSchedulerImpl 會 submit taskset 給 driver 端的 CoarseGrainedSchedulerBackend 後端,接著 CoarseGrainedSchedulerBackend 會一個一個的 LaunchTask。在遠端的 CoarseGrainedExecutorBackend 接收到 task 提交 event 後,會調用 Executor 執行 task,最終 task 是在 TaskRunner 的 run 方法內運行。

那麼在過程 4 中 DAG Scheduler 如何劃分作業?如果產生 stage、task 等給 Executor 執行呢?接著我們看作業劃分執行的示例。


Hello Spark! | Spark,從入門到精通


圖 7

圖 7 描述了一個 Spark 程序,從 HDFS 上讀取數據產生 RDD-A 然後 flatmap 操作到 RDD-B,讀取另一部分數據的到RDD-C,然後 map 操作的到 RDD-D,RDD-D 聚合操作 RDD-E,RDD-B 和 RDD-E 加入後得到 RDD-F,然後再將結果存儲到 HDFS 上。

Spark 根據 RDD 之間的不同點依賴關係切分成不同的階段(Stage),途中有四個階段,其中 Stage0和 Stage2 由於沒有依賴關係是可以並行執行的。但Stage2需要等待Stage1執行完畢。RDD-D 到 RDD- F 的聚合操作以及 Stage0 和 Stage2 得到的 RDD- B 和 RDD-E join在一起的到 RDD-F,這個過程會產生 shaffle。沒有依賴關係的Stage是可以並行執行的,但是對於job,Spark是串行執行的,如果想要並行執行Job,可以在Spark程序中進行多線程編程。

在這個 DAG 圖中,Spark 能夠充分了解數據之間的血緣關係,這樣某些任務失敗後可以根據血緣關係重新執行計算獲取失敗了的 RDD。

*寬依賴和窄依賴

窄依賴是指父RDD的每個分區只被子RDD的一個分區所使用,子RDD分區通常對應常數個父RDD分區;

寬依賴是指父RDD的每個分區都可能被多個子RDD分區所使用,子RDD分區通常對應所有的父RDD分區。這個概念在下面的例子中會涉及。

Spark 提供了豐富的算子,操作也更加通用。那麼這種劃分作業、執行並行計算的方案如何使 Spark 產生基於內存計算的快速效果呢?都說 Spark 擅長迭代計算,那麼我們通過一個經典的迭代問題 PageRank 算法來與 MapReduce 比較一下。


Hello Spark! | Spark,從入門到精通


圖 8,via http://www.jos.org.cn/jos/ch/reader/create_pdf.aspx?file_no=5557&journal_id=jos


圖 8 是 MapReduce 進行 pagerank 算法的一次迭代過程,需要注意的是灰色的部分都是需要存儲到磁盤的數據。


Hello Spark! | Spark,從入門到精通


圖 9 ,via http://www.jos.org.cn/jos/ch/reader/create_pdf.aspx?file_no=5557&journal_id=jos


圖 9 所示是 Spark 執行 pageRank 算法的一次迭代過程,相較於 MapReduce 做了很多改進。首先在內存足夠的情況下 Spark 允許用戶將常用的數據緩存到內存中,加快了系統的運行速度;其次 Spark 對數據之間的依賴關係有了明確的劃分,根據寬依賴與窄依賴關係進行任務的調度,可以實現管道化操作,使系統靈活性得以提高。


Hello Spark! | Spark,從入門到精通


圖 10:MapReduce 進行 pagerank 算法的二次迭代,via http://www.jos.org.cn/jos/ch/reader/create_pdf.aspx?file_no=5557&journal_id=jos


Hello Spark! | Spark,從入門到精通


圖 11:Spark 進行 pagerank 算法的二次迭代,via http://www.jos.org.cn/jos/ch/reader/create_pdf.aspx?file_no=5557&journal_id=jos


如圖所示 Spark 可以將具有窄依賴關係的 RDD 分區分配到一個任務中,進行管道化操作,任務內部數據無需通過網絡傳輸且任務之間互不干擾,因此 Spark 兩次迭代只有三次 shuffle。

在一次迭代過程中,MapReduce 與 Spark 在性能上可能並沒有很大的差別,但是隨著迭代次數的增加,兩者的差距逐漸顯現出來。Spark 根據依賴關係採用的任務調度策略使得 shuffle 次數相較於 MapReduce 有了顯著降低,因此 Spark 的設計十分適合進行迭代運算。

回顧本篇文章,我們依次從概念、特點及原理三個角度初步介紹了 Spark,下一篇我們將具體介紹 Spark on Yarn 的運作流程與機制,敬請期待。

附:Spark 相關術語表

Hello Spark! | Spark,從入門到精通



分享到:


相關文章: