幾種Spark調優方法

幾種Spark調優方法

Apache Spark 是專為大規模數據處理而設計的快速通用的計算引擎。Spark是UC Berkeley AMP lab (加州大學伯克利分校的AMP實驗室)所開源的類Hadoop MapReduce的通用並行框架,Spark,擁有Hadoop MapReduce所具有的優點;但不同於MapReduce的是——Job中間輸出結果可以保存在內存中,從而不再需要讀寫HDFS,因此Spark能更好地適用於數據挖掘與機器學習等需要迭代的MapReduce的算法。

1、數據序列化

默認使用的是Java自帶的序列化機制。優點是可以處理所有實現了java.io.Serializable 的類。但是Java 序列化比較慢。可以使用Kryo序列化機制,通常比Java 序列化機制性能高10倍。但是並不支持所有實現了java.io.Serializable 的類。使用 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 開啟Kryo序列化。不使用Kryo做為默認值的原因是:需要註冊自定義的類。例如:

val conf = new SparkConf().setMaster(...).setAppName(...)conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))val sc = new SparkContext(conf)

注意:如果Object很大,需要在配置中增加 spark.kryoserializer.buffer 的值。如果沒有在Kryo中註冊自定義的類,Kryo也能正常工作,這些類會完全地保存下來(等於沒有序列化就進行傳輸或保存了),會造成資源浪費。

2、內存調優

可以考慮3個方面:

  • 對象需要的總內存

  • 指向這些對象的指針

  • GC

通常情況下,指針佔用的空間將是原始數據的2~5倍。有以下幾個原因:

  • Java對象的“object header”(對象頭),包含了指向它的類的指針,佔用16bytes。對於一些只有很少數據的object,16bytes要比對象本身佔用的空間要多。

  • Java String 中在原始String數據的基礎上有另外40bytes的開銷(String的保存形式是Char的數組,並且有length的額外數據)。因為String內部使用UTF-16編碼,每個char 佔用2個byte。因此10個字符的String,將會很輕易地佔用60個bytes。

  • 諸如HashMap,LinkedList 的集合類,使用鏈式結構,每個entry(Map.Entry)都有一個包裝類。這些類不僅有“object header”,還有指向下一個對象的指針(通常是8個bytes)。

  • 基本類型的集合,通常會被包裝成對象類型。

3、內存管理

Spark中的內存使用主要有兩類:執行內存和存儲內存。執行內存是指shuffles, joins, sorts and aggregations計算時用到的內存。存儲內存主要是指cache和集群間傳播的內部數據用到的內存。執行內存和存儲內存使用的是同一塊區域。當沒有計算執行時, 存儲將獲得所有這塊區域的可用內存,反之亦然。執行比存儲具有更高的獲取內存的優先級,也就是說,如果內存不夠時,存儲會釋放一部分內存給執行用,直到存儲需要的最低的閥值。

有兩個相關的配置,但是通常來說,用戶不需要改變其默認值。

  • spark.memory.fraction 表示使用的Java 堆內存的比例。默認值0.6. 剩下的40%的內存用於:(a)存儲用戶數據、Spark內部元數據 (b)防止OOM。

  • spark.memory.storageFraction 表示上面所說的存儲內存最少佔用的比例。默認值 是0.5。

4、確定內存消耗

最好的方式是生成一個RDD並cache,在web UI 中的 Storage 中查看佔用了多少內存。

確定一個指定object 佔用內存的大小,可以使用 SizeEstimator.estimate(obj) 方法。

5、調整數據結構

減少內存消耗,首先應該避免使用基於指針的數據結構和包裝對象等諸如此類的Java特性。有以下幾種途徑:

  • 數據結構優先使用對象數組和基本類型,儘量不使用Java和scala裡的集合類(如:HashMap)。可以使用 fastutil (http://fastutil.di.unimi.it/) 提供的集合類和基本類型。

  • 儘量避免使用有很多小對象和指針的內嵌結構。

  • 考慮使用數字ID 和枚舉類代替作為key的String。

  • 如果內存小於32GB,在Spark-env.sh 裡設置-XX:+UseCompressedOops,這樣指針使用4bytes 而不是8bytes。

6、序列化RDD 存儲

當你的 object 仍然很大,簡單的降低內存消耗的方法是使用序列化的存儲方法。強烈建議使用kyro序列化機制。

7、垃圾回收調優

垃圾回收的時間主要是花費在尋找那些不再被引用的對象。因此它跟Java Object 的數量有關。我們應該使用具有較少object的數據結構(如:使用array代替linkedList)。一種較好的方法是用序列化的形式持久化Object,這樣每個RDD partition 只有一個字節數組。

測量GC的影響:在Java option 中加入 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 後,可在worker 的 stdout 中找到GC的日誌。

  • 在任務完成之前,如果有多次full GC,說明執行任務的內存不夠。

  • 如果有多次minor GC,但是 full GC 並不多,可以增大 Eden 區的大小。

  • 在GC的日誌中,如果老年代快滿了,減少 spark.memory.fraction 以降低cache所用的內存。

  • 嘗試使用 G1 垃圾回收器(-XX:+UseG1GC)。如果堆比較大,應該增加 G1 區的大小(通過 -XX:G1HeapRegionSize 設置)。

  • 如果任務是從HDFS上讀數據,HDFS 塊的大小為 128M,塊解壓後的大小一般為原始大小的2~3倍,如果要運行4個task,可以估算Eden區需要 4*3*128M。

8、其它

  • 並行度。除非你手動每步都設置較高的並行度,否則,集群不會被最大化地利用。Spark會自動根據每個文件的大小設置相應的task數量。對於諸如groupByKey,reduceByKey 等 reduce 操作,並行度為最大的父 RDD 的 partition 的數量。可以配置 spark.default.parallelism 設置默認的並行度。一般來講,建議一個CPU 運行 2~3個task。

  • Reduce Task 的內存使用。有時候,發生OOM並不是因為內存中放不下RDD,而是因為某個或幾個task 分配的內存不夠。例如:某個groupByKey 操作處理很大的數據集(因為數據傾斜的緣故)。 簡單的解決方法是:設置較高的並行度。

  • 廣播大的變量。 使用廣播的功能能有效地減少序列化的 task 的大小和集群加載job的花消。如果你的task中需要使用一個來自driver的大的object(如:靜態查詢表),應該把它轉化成廣播變量。 Master端會打印序列化後的 task 的大小,通常如果大於20KB 的話,就值得去優化。

  • 數據本地性。數據本地性可分為以下幾類:

    PROCESS_LOCAL 數據在運行代碼的JVM中。

    NODE_LOCAL 數據和運行的代碼在同一臺機器上。如:當前節點上正好有HDFS的數據塊。

    NO_PREF 數據可以較快獲取,但是不在本地。

    RACK_LOCAL 數據在同一 機架內,需要通過network獲取。

    Any 除上述外的數據。

最好的情況就是 task 都運行在最好的數據本地性的環境,但通常不太可能。很多時候,某個executor 上的任務都完成了,而其它忙碌的機器上尚有未處理的data。Spark通常會等一段時間,以等待忙碌的機器空閒下來去處理數據(因為具有較高的本地性)。當超過這個等待時間後,空間的executor會把這些數據拉過來進行處理。每個數據本地性級別對應的等待時間可以查看配置中的spark.locality部分。通常默認的配置工作得蠻好的。如果你的task運行時間較長,可以增加這些值。


分享到:


相關文章: