算子優化 MapPartitions
spark中,最基本的原則,就是每個task處理一個RDD的partition。
MapPartitions操作的優點:
如果是普通的map,比如一個partition中有1萬條數據;ok,那麼你的function要執行和計算1萬次。
但是,使用MapPartitions操作之後,一個task僅僅會執行一次function,function一次接收所有的partition數據。只要執行一次就可以了,性能比較高。
MapPartitions的缺點:一定是有的。
如果是普通的map操作,一次function的執行就處理一條數據;那麼如果內存不夠用的情況下,比如處理了1千條數據了,那麼這個時候內存不夠了,那麼就可以將已經處理完的1千條數據從內存裡面垃圾回收掉,或者用其他方法,騰出空間來吧。
所以說普通的map操作通常不會導致內存的OOM異常。
但是MapPartitions操作,對於大量數據來說,比如甚至一個partition,100萬數據,一次傳入一個function以後,那麼可能一下子內存不夠,但是又沒有辦法去騰出內存空間來,可能就OOM,內存溢出。
什麼時候比較適合用MapPartitions系列操作,就是說,數據量不是特別大的時候,都可以用這種MapPartitions系列操作,性能還是非常不錯的,是有提升的。比如原來是15分鐘,(曾經有一次性能調優),12分鐘。10分鐘->9分鐘。
但是也有過出問題的經驗,MapPartitions只要一用,直接OOM,內存溢出,崩潰。
在項目中,自己先去估算一下RDD的數據量,以及每個partition的量,還有自己分配給每個executor的內存資源。看看一下子內存容納所有的partition數據,行不行。如果行,可以試一下,能跑通就好。性能肯定是有提升的。
但是試了一下以後,發現,不行,OOM了,那就放棄吧。
算子優化 reduceByKey
transformation 操作,類似於MapReduce 中的combiner
val lines = sc.textFile("hdfs://")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val counts = pairs.reduceByKey(_ + _)
counts.collect()
reduceByKey,相較於普通的shuffle操作(比如groupByKey),它的一個特點,就是說,
會進行map端的本地聚合。
對map端給下個stage每個task創建的輸出文件中,寫數據之前,就會進行本地的combiner操作,
也就是說對每一個key,對應的values,都會執行你的算子函數(_ + _)
用reduceByKey對性能的提升:
1、在本地進行聚合以後,在map端的數據量就變少了,減少磁盤IO。而且可以減少磁盤空間的佔用。
2、下一個stage,拉取數據的量,也就變少了。減少網絡的數據傳輸的性能消耗。
3、在reduce端進行數據緩存的內存佔用變少了。
4、reduce端,要進行聚合的數據量也變少了。
總結:
reduceByKey在什麼情況下使用呢?
1、非常普通的,比如說,就是要實現類似於wordcount程序一樣的,對每個key對應的值,
進行某種數據公式或者算法的計算(累加、累乘)
2、對於一些類似於要對每個key進行一些字符串拼接的這種較為複雜的操作,可以自己衡量一下,
其實有時,也是可以使用reduceByKey來實現的。但是不太好實現。如果真能夠實現出來,
對性能絕對是有幫助的。(shuffle基本上就佔了整個
Spark作業的90%以上的性能消耗,主要能對shuffle進行一定的調優,都是有價值的)
我們的程序沒有那麼去做!但是把這個當作一個課後思考題給大家,看大家能不能對我們的聚合session
的操作應用上ReduceByKey來提高性能!
算子優化 repartiton
算子調優之使用repartition解決Spark SQL低並行度的性能問題
spark.sql.shuffle.partitions 調整DataFrame的shuffle並行度
spark.default.parallelism 調整RDD的shuffle並行度
並行度:之前說過,並行度是自己可以調節,或者說是設置的。
1、spark.default.parallelism
2、textFile(),傳入第二個參數,指定partition數量(比較少用)
咱們的項目代碼中,沒有設置並行度,實際上,在生產環境中,是最好自己設置一下的。
官網有推薦的設置方式,你的spark-submit腳本中,會指定你的application總共要啟動多少個executor,
100個;每個executor多少個cpu core,2~3個;總共application,有cpu core,200個。
官方推薦,根據你的application的總cpu core數量(在spark-submit中可以指定,200個),
自己手動設置spark.default.parallelism參數,指定為cpu core總數的2~3倍。400~600個並行度。600。
承上啟下
你設置的這個並行度,在哪些情況下會生效?哪些情況下,不會生效?
如果你壓根兒沒有使用Spark SQL(DataFrame),那麼你整個spark application默認所有stage的並行度
都是你設置的那個參數。(除非你使用coalesce算子縮減過partition數量)
問題來了,Spark SQL,用了。用Spark SQL的那個stage的並行度,你沒法自己指定。
Spark SQL自己會默認根據hive表對應的hdfs文件的block,自動設置Spark SQL查詢所在的那個stage的
並行度。你自己通過spark.default.parallelism參數指定的並行度,只會在沒有Spark SQL的stage中生效。
比如你第一個stage,用了Spark SQL從hive表中查詢出了一些數據,然後做了一些transformation操作,
接著做了一個shuffle操作(groupByKey);下一個stage,在shuffle操作之後,
做了一些transformation操作。hive表,對應了一個hdfs文件,有20個block;
你自己設置了spark.default.parallelism參數為100。
你的第一個stage的並行度,是不受你的控制的,就只有20個task;第二個stage,
才會變成你自己設置的那個並行度,100。
問題在哪裡?
Spark SQL默認情況下,它的那個並行度,咱們沒法設置。可能導致的問題,也許沒什麼問題,
也許很有問題。Spark SQL所在的那個stage中,後面的那些transformation操作,
可能會有非常複雜的業務邏輯,甚至說複雜的算法。如果你的Spark SQL默認把task數量設置的很少,
20個,然後每個task要處理為數不少的數據量,然後還要執行特別複雜的算法。
這個時候,就會導致第一個stage的速度,特別慢。第二個stage,1000個task,刷刷刷,非常快。
解決上述Spark SQL無法設置並行度和task數量的辦法,是什麼呢?
repartition算子,你用Spark SQL這一步的並行度和task數量,肯定是沒有辦法去改變了。但是呢,
可以將你用Spark SQL查詢出來的RDD,使用repartition算子,去重新進行分區,
此時可以分區成多個partition,比如從20個partition,分區成100個。
然後呢,從repartition以後的RDD,再往後,並行度和task數量,就會按照你預期的來了。
就可以避免跟Spark SQL綁定在一個stage中的算子,只能使用少量的task去處理大量數據以及
複雜的算法邏輯。
這裡就很有可能發生上面說的問題
比如說,Spark SQl默認就給第一個stage設置了20個task,但是根據你的數據量以及算法的複雜度
實際上,你需要1000個task去並行執行
所以說,在這裡,就可以對Spark SQL剛剛查詢出來的RDD執行repartition重分區操作
算子優化 filter
默認情況下,經過了這種filter之後,RDD中的每個partition的數據量,可能都不太一樣了。
(原本每個partition的數據量可能是差不多的)
問題:
1、每個partition數據量變少了,但是在後面進行處理的時候,還是要跟partition數量一樣數量的task,
來進行處理;有點浪費task計算資源。
2、每個partition的數據量不一樣,會導致後面的每個task處理每個partition的時候,
每個task要處理的數據量就不同,這個時候很容易發生什麼問題?
數據傾斜。。。。
比如說,第二個partition的數據量才100;但是第三個partition的數據量是900;
那麼在後面的task處理邏輯一樣的情況下,不同的task要處理的數據量可能差別達到了9倍,
甚至10倍以上;同樣也就導致了速度的差別在9倍,甚至10倍以上。
這樣的話呢,就會導致有些task運行的速度很快;有些task運行的速度很慢。這,就是數據傾斜。
針對上述的兩個問題,我們希望應該能夠怎麼樣?
1、針對第一個問題,我們希望可以進行partition的壓縮吧,因為數據量變少了,
那麼partition其實也完全可以對應的變少。比如原來是4個partition,現在完全可以變成2個partition。
那麼就只要用後面的2個task來處理即可。就不會造成task計算資源的浪費。
(不必要,針對只有一點點數據的partition,還去啟動一個task來計算)
2、針對第二個問題,其實解決方案跟第一個問題是一樣的;也是去壓縮partition,
儘量讓每個partition的數據量差不多。那麼這樣的話,後面的task分配到的partition的數據量
也就差不多。不會造成有的task運行速度特別慢,有的task運行速度特別快。避免了數據傾斜的問題。
有了解決問題的思路之後,接下來,我們該怎麼來做呢?實現?
算子優化 coalesce算子
主要就是用於在filter操作之後,針對每個partition的數據量各不相同的情況,來壓縮partition的數量。
減少partition的數量,而且讓每個partition的數據量都儘量均勻緊湊。
從而便於後面的task進行計算操作,在某種程度上,能夠一定程度的提升性能。
說明一下:
這兒,是對完整的數據進行了filter過濾,過濾出來點擊行為的數據點擊行為的數據其實只佔總數據的一小部分(譬如 20%)
所以過濾以後的RDD,每個partition的數據量,很有可能跟我們之前說的一樣,會很不均勻而且數據量肯定會變少很多
所以針對這種情況,還是比較合適用一下coalesce算子的,在filter過後去減少partition的數量
coalesce(100)
這個就是說經過filter之後再把數據壓縮的比較緊湊,壓縮為100個數據分片,也就是形成了 100 個 partition
對這個coalesce操作做一個說明
如果運行模式都是local模式,主要是用來測試,所以local模式下,
不用去設置分區和並行度的數量
local模式自己本身就是進程內模擬的集群來執行,本身性能就很高
而且對並行度、partition數量都有一定的內部的優化
這裡我們再自己去設置,就有點畫蛇添足
但是就是跟大家說明一下,coalesce算子的使用,即可
算子優化 foreachPartition
foreach的寫庫原理
默認的foreach的性能缺陷在哪裡?
首先,對於每條數據,都要單獨去調用一次function,task為每個數據,都要去執行一次function函數。
如果100萬條數據,(一個partition),調用100萬次。性能比較差。
另外一個非常非常重要的一點
如果每個數據,你都去創建一個數據庫連接的話,那麼你就得創建100萬次數據庫連接。
但是要注意的是,數據庫連接的創建和銷燬,都是非常非常消耗性能的。雖然我們之前已經用了
數據庫連接池,只是創建了固定數量的數據庫連接。
你還是得多次通過數據庫連接,往數據庫(MySQL)發送一條SQL語句,然後MySQL需要去執行這條SQL語句。
如果有100萬條數據,那麼就是100萬次發送SQL語句。
以上兩點(數據庫連接,多次發送SQL語句),都是非常消耗性能的。
foreachPartition,在生產環境中,通常來說,都使用foreachPartition來寫數據庫的
使用批處理操作(一條SQL和多組參數)
發送一條SQL語句,發送一次
一下子就批量插入100萬條數據。
用了foreachPartition算子之後,好處在哪裡?
1、對於我們寫的function函數,就調用一次,一次傳入一個partition所有的數據
2、主要創建或者獲取一個數據庫連接就可以
3、只要向數據庫發送一次SQL語句和多組參數即可
在實際生產環境中,清一色,都是使用foreachPartition操作;但是有個問題,跟mapPartitions操作一樣,
如果一個partition的數量真的特別特別大,比如真的是100萬,那基本上就不太靠譜了。
一下子進來,很有可能會發生OOM,內存溢出的問題。
一組數據的對比:生產環境
一個partition大概是1千條左右
用foreach,跟用foreachPartition,性能的提升達到了2~3分鐘。
實際項目操作:
首先JDBCHelper裡面已經封裝好了一次批量插入操作!
批量插入session detail
唯一不一樣的是我們需要ISessionDetailDAO裡面去實現一個批量插入
List
閱讀更多 張安文 的文章