Spark BlockManager解析

1 BlockManager數據存儲與管理機制

BlockManager是整個Spark底層負責數據存儲與管理的一個組件,Driver和Executor的所有數據都由對應的BlockManager進行管理。

Driver上有BlockManagerMaster,負責對各個節點上的BlockManager內部管理的數據的元數據進行維護,比如block的增刪改等操作,都會在這裡維護好元數據的變更。

每個節點都有一個BlockManager,每個BlockManager創建之後,第一件事即使去向BlockManagerMaster進行註冊,此時BlockManagerMaster會為其長難句對應的BlockManagerInfo。

BlockManager運行原理如下圖所示:

Spark BlockManager解析

圖7-1 BlockManager原理

BlockManagerMaster與BlockManager的關係非常像NameNode與DataNode的關係,BlockManagerMaster中保存中BlockManager內部管理數據的元數據,進行維護,當BlockManager進行Block增刪改等操作時,都會在BlockManagerMaster中進行元數據的變更,這與NameNode維護DataNode的元數據信息,DataNode中數據發生變化時NameNode中的元數據信息也會相應變化是一致的。

每個節點上都有一個BlockManager,BlockManager中有3個非常重要的組件:

  • DiskStore:負責對磁盤數據進行讀寫;
  • MemoryStore:負責對內存數據進行讀寫;
  • BlockTransferService:負責建立BlockManager到遠程其他節點的BlockManager的連接,負責對遠程其他節點的BlockManager的數據進行讀寫;

每個BlockManager創建之後,做的第一件事就是想BlockManagerMaster進行註冊,此時BlockManagerMaster會為其創建對應的BlockManagerInfo。

使用BlockManager進行寫操作時,比如說,RDD運行過程中的一些中間數據,或者我們手動指定了persist(),會優先將數據寫入內存中,如果內存大小不夠,會使用自己的算法,將內存中的部分數據寫入磁盤;此外,如果persist()指定了要replica,那麼會使用BlockTransferService將數據replicate一份到其他節點的BlockManager上去。

使用BlockManager進行讀操作時,比如說,shuffleRead操作,如果能從本地讀取,就利用DiskStore或者MemoryStore從本地讀取數據,但是本地沒有數據的話,那麼會用BlockTransferService與有數據的BlockManager建立連接,然後用BlockTransferService從遠程BlockManager讀取數據;例如,shuffle Read操作中,很有可能要拉取的數據在本地沒有,那麼此時就會到遠程有數據的節點上,找那個節點的BlockManager來拉取需要的數據。

只要使用BlockManager執行了數據增刪改的操作,那麼必須將Block的BlockStatus上報到BlockManagerMaster,在BlockManagerMaster上會對指定BlockManager的BlockManagerInfo內部的BlockStatus進行增刪改操作,從而達到元數據的維護功能。

7.2 Spark 共享變量底層實現

Spark一個非常重要的特性就是共享變量。

默認情況下,如果在一個算子的函數中使用到了某個外部的變量,那麼這個變量的值會被拷貝到每個task中,此時每個task只能操作自己的那份變量副本。如果多個task想要共享某個變量,那麼這種方式是做不到的。

Spark為此提供了兩種共享變量,一種是Broadcast Variable(廣播變量),另一種是Accumulator(累加變量)。Broadcast Variable會將用到的變量,僅僅為每個節點拷貝一份,即每個Executor拷貝一份,更大的用途是優化性能,減少網絡傳輸以及內存損耗。Accumulator則可以讓多個task共同操作一份變量,主要可以進行累加操作。Broadcast Variable是共享讀變量,task不能去修改它,而Accumulator可以讓多個task操作一個變量。

7.2.1 廣播變量

廣播變量允許編程者在每個Executor上保留外部數據的只讀變量,而不是給每個任務發送一個副本。

每個task都會保存一份它所使用的外部變量的副本,當一個Executor上的多個task都使用一個大型外部變量時,對於Executor內存的消耗是非常大的,因此,我們可以將大型外部變量封裝為廣播變量,此時一個Executor保存一個變量副本,此Executor上的所有task共用此變量,不再是一個task單獨保存一個副本,這在一定程度上降低了Spark任務的內存佔用。

Spark BlockManager解析

圖7-2 task使用外部變量

Spark BlockManager解析

圖7-3 使用廣播變量

Spark還嘗試使用高效的廣播算法分發廣播變量,以降低通信成本。

Spark提供的Broadcast Variable是隻讀的,並且在每個Executor上只會有一個副本,而不會為每個task都拷貝一份副本,因此,它的最大作用,就是減少變量到各個節點的網絡傳輸消耗,以及在各個節點上的內存消耗。此外,Spark內部也使用了高效的廣播算法來減少網絡消耗。

可以通過調用SparkContext的broadcast()方法來針對每個變量創建廣播變量。然後在算子的函數內,使用到廣播變量時,每個Executor只會拷貝一份副本了,每個task可以使用廣播變量的value()方法獲取值。

在任務運行時,Executor並不獲取廣播變量,當task執行到 使用廣播變量的代碼時,會向Executor的內存中請求廣播變量,如下圖所示:

Spark BlockManager解析

圖7-4 task向Executor請求廣播變量

之後Executor會通過BlockManager向Driver拉取廣播變量,然後提供給task進行使用,如下圖所示:

Spark BlockManager解析

圖7-5 Executor從Driver拉取廣播變量

廣播大變量是Spark中常用的基礎優化方法,通過減少內存佔用實現任務執行性能的提升。

7.2.2 累加器

累加器(accumulator):Accumulator是僅僅被相關操作累加的變量,因此可以在並行中被有效地支持。它們可用於實現計數器(如MapReduce)或總和計數。

Accumulator是存在於Driver端的,集群上運行的task進行Accumulator的累加,隨後把值發到Driver端,在Driver端彙總(Spark UI在SparkContext創建時被創建,即在Driver端被創建,因此它可以讀取Accumulator的數值),由於Accumulator存在於Driver端,從節點讀取不到Accumulator的數值。

Spark提供的Accumulator主要用於多個節點對一個變量進行共享性的操作。Accumulator只提供了累加的功能,但是卻給我們提供了多個task對於同一個變量並行操作的功能,但是task只能對Accumulator進行累加操作,不能讀取它的值,只有Driver程序可以讀取Accumulator的值。

Accumulator的底層原理如下圖所示:

Spark BlockManager解析

圖7-6 累加器原理


分享到:


相關文章: