Spark 內存管理
在執行Spark 的應用程序時,Spark 集群會啟動 Driver 和 Executor 兩種 JVM 進程,前者為主控進程,負責創建 Spark 上下文,提交 Spark 作業(Job),並將作業轉化為計算任務(Task),在各個 Executor 進程間協調任務的調度,後者負責在工作節點上執行具體的計算任務,並將結果返回給 Driver,同時為需要持久化的 RDD 提供存儲功能。由於 Driver 的內存管理相對來說較為簡單,本節主要對 Executor 的內存管理進行分析,下文中的 Spark 內存均特指 Executor 的內存。
1 堆內和堆外內存規劃
作為一個 JVM 進程,Executor 的內存管理建立在 JVM 的內存管理之上,Spark 對 JVM 的堆內(On-heap)空間進行了更為詳細的分配,以充分利用內存。同時,Spark 引入了堆外(Off-heap)內存,使之可以直接在工作節點的系統內存中開闢空間,進一步優化了內存的使用。
堆內內存受到JVM統一管理,堆外內存是直接向操作系統進行內存的申請和釋放。
圖1-1 Executor堆內與堆外內存
1.1堆內內存
堆內內存的大小,由 Spark 應用程序啟動時的 –executor-memory 或 spark.executor.memory 參數配置。Executor 內運行的併發任務共享 JVM 堆內內存,這些任務在緩存 RDD 數據和廣播(Broadcast)數據時佔用的內存被規劃為存儲(Storage)內存,而這些任務在執行 Shuffle 時佔用的內存被規劃為執行(Execution)內存,剩餘的部分不做特殊規劃,那些 Spark 內部的對象實例,或者用戶定義的 Spark 應用程序中的對象實例,均佔用剩餘的空間。不同的管理模式下,這三部分佔用的空間大小各不相同。
Spark 對堆內內存的管理是一種邏輯上的”規劃式”的管理,因為對象實例佔用內存的申請和釋放都由 JVM 完成,Spark 只能在申請後和釋放前記錄這些內存,我們來看其具體流程:
申請內存流程如下:
- Spark 在代碼中 new 一個對象實例;
- JVM 從堆內內存分配空間,創建對象並返回對象引用;
- Spark 保存該對象的引用,記錄該對象佔用的內存。
釋放內存流程如下:
- Spark記錄該對象釋放的內存,刪除該對象的引用;
- 等待JVM的垃圾回收機制釋放該對象佔用的堆內內存。
我們知道,JVM 的對象可以以序列化的方式存儲,序列化的過程是將對象轉換為二進制字節流,本質上可以理解為將非連續空間的鏈式存儲轉化為連續空間或塊存儲,在訪問時則需要進行序列化的逆過程——反序列化,將字節流轉化為對象,序列化的方式可以節省存儲空間,但增加了存儲和讀取時候的計算開銷。
對於 Spark 中序列化的對象,由於是字節流的形式,其佔用的內存大小可直接計算,而對於非序列化的對象,其佔用的內存是通過週期性地採樣近似估算而得,即並不是每次新增的數據項都會計算一次佔用的內存大小,這種方法降低了時間開銷但是有可能誤差較大,導致某一時刻的實際內存有可能遠遠超出預期。此外,在被 Spark 標記為釋放的對象實例,很有可能在實際上並沒有被 JVM 回收,導致實際可用的內存小於 Spark 記錄的可用內存。所以 Spark 並不能準確記錄實際可用的堆內內存,從而也就無法完全避免內存溢出(OOM, Out of Memory)的異常。
雖然不能精準控制堆內內存的申請和釋放,但 Spark 通過對存儲內存和執行內存各自獨立的規劃管理,可以決定是否要在存儲內存裡緩存新的 RDD,以及是否為新的任務分配執行內存,在一定程度上可以提升內存的利用率,減少異常的出現。
- 堆外內存
為了進一步優化內存的使用以及提高 Shuffle 時排序的效率,Spark 引入了堆外(Off-heap)內存,使之可以直接在工作節點的系統內存中開闢空間,存儲經過序列化的二進制數據。
堆外內存意味著把內存對象分配在Java虛擬機的堆以外的內存,這些內存直接受操作系統管理(而不是虛擬機)。這樣做的結果就是能保持一個較小的堆,以減少垃圾收集對應用的影響。
利用 JDK Unsafe API(從 Spark 2.0 開始,在管理堆外的存儲內存時不再基於 Tachyon,而是與堆外的執行內存一樣,基於 JDK Unsafe API 實現),Spark 可以直接操作系統堆外內存,減少了不必要的內存開銷,以及頻繁的 GC 掃描和回收,提升了處理性能。堆外內存可以被精確地申請和釋放(堆外內存之所以能夠被精確的申請和釋放,是由於內存的申請和釋放不再通過JVM機制,而是直接向操作系統申請,JVM對於內存的清理是無法準確指定時間點的,因此無法實現精確的釋放),而且序列化的數據佔用的空間可以被精確計算,所以相比堆內內存來說降低了管理的難度,也降低了誤差。
在默認情況下堆外內存並不啟用,可通過配置 spark.memory.offHeap.enabled 參數啟用,並由 spark.memory.offHeap.size 參數設定堆外空間的大小。除了沒有 other 空間,堆外內存與堆內內存的劃分方式相同,所有運行中的併發任務共享存儲內存和執行內存。
(該部分內存主要用於程序的共享庫、Perm Space、線程Stack和一些Memory mapping等, 或者類C方式allocate object)
6.2 內存空間分配
1.靜態內存管理
在 Spark 最初採用的靜態內存管理機制下,存儲內存、執行內存和其他內存的大小在 Spark 應用程序運行期間均為固定的,但用戶可以應用程序啟動前進行配置,堆內內存的分配如圖 2 所示:
圖1-2 靜態內存管理——堆內內存
可以看到,可用的堆內內存的大小需要按照代碼清單1-1的方式計算:
代碼清單1-1 堆內內存計算公式
可用的存儲內存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safety Fraction
可用的執行內存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safety Fraction
其中 systemMaxMemory 取決於當前 JVM 堆內內存的大小,最後可用的執行內存或者存儲內存要在此基礎上與各自的 memoryFraction 參數和 safetyFraction 參數相乘得出。上述計算公式中的兩個 safetyFraction 參數,其意義在於在邏輯上預留出 1-safetyFraction 這麼一塊保險區域,降低因實際內存超出當前預設範圍而導致 OOM 的風險(上文提到,對於非序列化對象的內存採樣估算會產生誤差)。值得注意的是,這個預留的保險區域僅僅是一種邏輯上的規劃,在具體使用時 Spark 並沒有區別對待,和”其它內存”一樣交給了 JVM 去管理。
Storage內存和Execution內存都有預留空間,目的是防止OOM,因為Spark堆內內存大小的記錄是不準確的,需要留出保險區域。
堆外的空間分配較為簡單,只有存儲內存和執行內存,如圖1-3所示。可用的執行內存和存儲內存佔用的空間大小直接由參數spark.memory.storageFraction 決定,由於堆外內存佔用的空間可以被精確計算,所以無需再設定保險區域。
圖1-3 靜態內存管理
靜態內存管理機制實現起來較為簡單,但如果用戶不熟悉 Spark 的存儲機制,或沒有根據具體的數據規模和計算任務或做相應的配置,很容易造成”一半海水,一半火焰”的局面,即存儲內存和執行內存中的一方剩餘大量的空間,而另一方卻早早被佔滿,不得不淘汰或移出舊的內容以存儲新的內容。由於新的內存管理機制的出現,這種方式目前已經很少有開發者使用,出於兼容舊版本的應用程序的目的,Spark 仍然保留了它的實現。
- 統一內存管理
Spark 1.6 之後引入的統一內存管理機制,與靜態內存管理的區別在於存儲內存和執行內存共享同一塊空間,可以動態佔用對方的空閒區域,統一內存管理的堆內內存結構如圖 1-4所示:
圖1-4 統一內存管理——堆內內存
統一內存管理的堆外內存結構如圖 1-5所示:
圖1-5 統一內存管理——堆外內存
其中最重要的優化在於動態佔用機制,其規則如下:
- 設定基本的存儲內存和執行內存區域(spark.storage.storageFraction 參數),該設定確定了雙方各自擁有的空間的範圍;
- 雙方的空間都不足時,則存儲到硬盤;若己方空間不足而對方空餘時,可借用對方的空間;(存儲空間不足是指不足以放下一個完整的 Block)
- 執行內存的空間被對方佔用後,可讓對方將佔用的部分轉存到硬盤,然後”歸還”借用的空間;
- 存儲內存的空間被對方佔用後,無法讓對方”歸還”,因為需要考慮 Shuffle 過程中的很多因素,實現起來較為複雜。
統一內存管理的動態佔用機制如圖 1-6所示:
圖1-6 同一內存管理——動態佔用機制
憑藉統一內存管理機制,Spark 在一定程度上提高了堆內和堆外內存資源的利用率,降低了開發者維護 Spark 內存的難度,但並不意味著開發者可以高枕無憂。如果存儲內存的空間太大或者說緩存的數據過多,反而會導致頻繁的全量垃圾回收,降低任務執行時的性能,因為緩存的 RDD 數據通常都是長期駐留內存的。所以要想充分發揮 Spark 的性能,需要開發者進一步瞭解存儲內存和執行內存各自的管理方式和實現原理。
6.3 存儲內存管理
1.RDD的持久化機制
彈性分佈式數據集(RDD)作為 Spark 最根本的數據抽象,是隻讀的分區記錄(Partition)的集合,只能基於在穩定物理存儲中的數據集上創建,或者在其他已有的 RDD 上執行轉換(Transformation)操作產生一個新的 RDD。轉換後的 RDD 與原始的 RDD 之間產生的依賴關係,構成了血統(Lineage)。憑藉血統,Spark保證了每一個 RDD 都可以被重新恢復。但 RDD 的所有轉換都是惰性的,即只有當一個返回結果給 Driver 的行動(Action)發生時,Spark 才會創建任務讀取 RDD,然後真正觸發轉換的執行。
Task 在啟動之初讀取一個分區時,會先判斷這個分區是否已經被持久化,如果沒有則需要檢查 Checkpoint 或按照血統重新計算。所以如果一個 RDD 上要執行多次行動,可以在第一次行動中使用 persist 或 cache 方法,在內存或磁盤中持久化或緩存這個 RDD,從而在後面的行動時提升計算速度。
事實上,cache 方法是使用默認的 MEMORY_ONLY 的存儲級別將 RDD 持久化到內存,故緩存是一種特殊的持久化。 堆內和堆外存儲內存的設計,便可以對緩存 RDD 時使用的內存做統一的規劃和管理。
RDD 的持久化由 Spark 的 Storage 模塊負責,實現了 RDD 與物理存儲的解耦合。Storage 模塊負責管理 Spark 在計算過程中產生的數據,將那些在內存或磁盤、在本地或遠程存取數據的功能封裝了起來。在具體實現時 Driver 端和 Executor 端的 Storage 模塊構成了主從式的架構,即 Driver 端的 BlockManager 為 Master,Executor 端的 BlockManager 為 Slave。
Storage 模塊在邏輯上以 Block 為基本存儲單位,RDD 的每個 Partition 經過處理後唯一對應一個 Block(BlockId 的格式為 rdd_RDD-ID_PARTITION-ID )。Driver端的Master 負責整個 Spark 應用程序的 Block 的元數據信息的管理和維護,而Executor端的 Slave 需要將 Block 的更新等狀態上報到 Master,同時接收 Master 的命令,例如新增或刪除一個 RDD。
圖5-1 Storage模塊示意圖
在對 RDD 持久化時,Spark 規定了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 種不同的存儲級別 ,而存儲級別是以下 5 個變量的組合:
代碼清單5-1 resourceOffer代碼
class StorageLevel private(
private var _useDisk: Boolean, //磁盤
private var _useMemory: Boolean, //這裡其實是指堆內內存
private var _useOffHeap: Boolean, //堆外內存
private var _deserialized: Boolean, //是否為非序列化
private var _replication: Int = 1 //副本個數
)
Spark中7種存儲級別如下:
表5-1 Spark持久化級別
通過對數據結構的分析,可以看出存儲級別從三個維度定義了 RDD 的 Partition(同時也就是 Block)的存儲方式:
1) 存儲位置:磁盤/堆內內存/堆外內存。如 MEMORY_AND_DISK 是同時在磁盤和堆內內存上存儲,實現了冗餘備份。OFF_HEAP 則是隻在堆外內存存儲,目前選擇堆外內存時不能同時存儲到其他位置。
2) 存儲形式:Block 緩存到存儲內存後,是否為非序列化的形式。如 MEMORY_ONLY 是非序列化方式存儲,OFF_HEAP 是序列化方式存儲。
3) 副本數量:大於 1 時需要遠程冗餘備份到其他節點。如 DISK_ONLY_2 需要遠程備份 1 個副本。
- RDD的緩存過程
RDD 在緩存到存儲內存之前,Partition 中的數據一般以迭代器(Iterator)的數據結構來訪問,這是 Scala 語言中一種遍歷數據集合的方法。通過 Iterator 可以獲取分區中每一條序列化或者非序列化的數據項(Record),這些 Record 的對象實例在邏輯上佔用了 JVM 堆內內存的 other 部分的空間,同一 Partition 的不同 Record
的存儲空間並不連續。RDD 在緩存到存儲內存之後,Partition 被轉換成 Block,Record 在堆內或堆外存儲內存中佔用一塊連續的空間。將Partition由不連續的存儲空間轉換為連續存儲空間的過程,Spark稱之為"展開"(Unroll)。
Block 有序列化和非序列化兩種存儲格式,具體以哪種方式取決於該 RDD 的存儲級別。非序列化的 Block 以一種 DeserializedMemoryEntry 的數據結構定義,用一個數組存儲所有的對象實例,序列化的 Block 則以 SerializedMemoryEntry的數據結構定義,用字節緩衝區(ByteBuffer)來存儲二進制數據。每個 Executor 的 Storage 模塊用一個鏈式 Map 結構(LinkedHashMap)來管理堆內和堆外存儲內存中所有的 Block 對象的實例,對這個 LinkedHashMap 新增和刪除間接記錄了內存的申請和釋放。
因為不能保證存儲空間可以一次容納 Iterator 中的所有數據,當前的計算任務在 Unroll 時要向 MemoryManager 申請足夠的 Unroll 空間來臨時佔位,空間不足則 Unroll 失敗,空間足夠時可以繼續進行 。
對於序列化的 Partition,其所需的 Unroll 空間可以直接累加計算,一次申請。
對於非序列化的 Partition 則要在遍歷 Record 的過程中依次申請,即每讀取一條 Record,採樣估算其所需的 Unroll 空間並進行申請,空間不足時可以中斷,釋放已佔用的 Unroll 空間。
如果最終 Unroll 成功,當前 Partition 所佔用的 Unroll 空間被轉換為正常的緩存 RDD 的存儲空間,如下圖所示。
圖5-2 Spark Unroll
在靜態內存管理時,Spark 在存儲內存中專門劃分了一塊 Unroll 空間,其大小是固定的,統一內存管理時則沒有對 Unroll 空間進行特別區分,當存儲空間不足時會根據動態佔用機制進行處理。
- 淘汰與落盤
由於同一個 Executor 的所有的計算任務共享有限的存儲內存空間,當有新的 Block需要緩存但是剩餘空間不足且無法動態佔用時,就要對 LinkedHashMap中的舊 Block 進行淘汰(Eviction),而被淘汰的 Block如果其存儲級別中同時包含存儲到磁盤的要求,則要對其進行落盤(Drop),否則直接刪除該 Block。
存儲內存的淘汰規則為:
- 被淘汰的舊 Block 要與新 Block 的 MemoryMode 相同,即同屬於堆外或堆內內存;
- 新舊 Block 不能屬於同一個 RDD,避免循環淘汰;
- 舊 Block 所屬 RDD 不能處於被讀狀態,避免引發一致性問題;
- 遍歷 LinkedHashMap 中 Block,按照最近最少使用(LRU)的順序淘汰,直到滿足新 Block 所需的空間。其中 LRU 是 LinkedHashMap 的特性。
落盤的流程則比較簡單,如果其存儲級別符合useDisk 為 true 的條件,再根據其deserialized 判斷是否是非序列化的形式,若是則對其進行序列化,最後將數據存儲到磁盤,在 Storage 模塊中更新其信息。
6.4 執行內存管理
執行內存主要用來存儲任務在執行 Shuffle 時佔用的內存,Shuffle 是按照一定規則對 RDD 數據重新分區的過程,我們來看 Shuffle 的 Write 和 Read 兩階段對執行內存的使用:
- Shuffle Write
1) 若在 map 端選擇普通的排序方式,會採用 ExternalSorter 進行外排,在內存中存儲數據時主要佔用堆內執行空間。
2) 若在 map 端選擇 Tungsten 的排序方式,則採用 ShuffleExternalSorter 直接對以序列化形式存儲的數據排序,在內存中存儲數據時可以佔用堆外或堆內執行空間,取決於用戶是否開啟了堆外內存以及堆外執行內存是否足夠。
- Shuffle Read
1) 在對 reduce 端的數據進行聚合時,要將數據交給 Aggregator 處理,在內存中存儲數據時佔用堆內執行空間。
2) 如果需要進行最終結果排序,則要將再次將數據交給 ExternalSorter 處理,佔用堆內執行空間。
在 ExternalSorter 和 Aggregator 中,Spark 會使用一種叫 AppendOnlyMap 的哈希表在堆內執行內存中存儲數據,但在 Shuffle 過程中所有數據並不能都保存到該哈希表中,當這個哈希表佔用的內存會進行週期性地採樣估算,當其大到一定程度,無法再從 MemoryManager 申請到新的執行內存時,Spark 就會將其全部內容存儲到磁盤文件中,這個過程被稱為溢存(Spill),溢存到磁盤的文件最後會被歸併(Merge)。
Shuffle Write 階段中用到的 Tungsten 是 Databricks 公司提出的對 Spark 優化內存和 CPU 使用的計劃(鎢絲計劃),解決了一些 JVM 在性能上的限制和弊端。Spark 會根據 Shuffle 的情況來自動選擇是否採用 Tungsten 排序。
Tungsten 採用的頁式內存管理機制建立在 MemoryManager 之上,即 Tungsten 對執行內存的使用進行了一步的抽象,這樣在 Shuffle 過程中無需關心數據具體存儲在堆內還是堆外。
每個內存頁用一個 MemoryBlock 來定義,並用 Object obj 和 long offset 這兩個變量統一標識一個內存頁在系統內存中的地址。
堆內的 MemoryBlock 是以 long 型數組的形式分配的內存,其 obj 的值為是這個數組的對象引用,offset 是 long 型數組的在 JVM 中的初始偏移地址,兩者配合使用可以定位這個數組在堆內的絕對地址;堆外的 MemoryBlock 是直接申請到的內存塊,其 obj 為 null,offset 是這個內存塊在系統內存中的 64 位絕對地址。
Spark 用 MemoryBlock巧妙地將堆內和堆外內存頁統一抽象封裝,並用頁表(pageTable)管理每個 Task申請到的內存頁。Tungsten 頁式管理下的所有內存用 64 位的邏輯地址表示,由頁號和頁內偏移量組成:
- 頁號:佔 13 位,唯一標識一個內存頁,Spark 在申請內存頁之前要先申請空閒頁號。
- 頁內偏移量:佔 51 位,是在使用內存頁存儲數據時,數據在頁內的偏移地址。
有了統一的尋址方式,Spark 可以用 64 位邏輯地址的指針定位到堆內或堆外的內存,整個 Shuffle Write 排序的過程只需要對指針進行排序,並且無需反序列化,整個過程非常高效,對於內存訪問效率和 CPU 使用效率帶來了明顯的提升。
Spark 的存儲內存和執行內存有著截然不同的管理方式:對於存儲內存來說,Spark 用一個 LinkedHashMap 來集中管理所有的 Block,Block 由需要緩存的 RDD 的 Partition 轉化而成;而對於執行內存,Spark 用 AppendOnlyMap 來存儲 Shuffle 過程中的數據,在 Tungsten 排序中甚至抽象成為頁式內存管理,開闢了全新的 JVM 內存管理機制。
閱讀更多 kane0409 的文章