RDD-分區器(Partitioner)

RDD之間的依賴關係如果是Shuffle依賴,上游的RDD該如何確定每個分區的輸出將交由下游RDD的那個分區呢?或者下游RDD的各個分區將具體依賴於上游RDD的哪些分區?Spark提供分區計算器來解決這個問題。

ShuffleDependency的partitioner屬性的類型是Partitioner,抽象類Partitioner定義了分區計算器的接口規範,ShuffleDependency分區取決於Partitioner的具體實現。Partitioner源碼,位於org.apache.spark.Partitioner

Partitioner的伴生對象,位於org.apache.spark.Partitioner

Partitioner的numPartitions方法用於獲取分區數量。Partitioner的getPartition方法用於將輸入的key映射到下游RDD的從0到numPartitions-1這樣範圍中的某一個分區。

分區程序必須是確定性的,即它必須在給定相同分區鍵的情況下返回相同的分區ID。

如果設置了spark.default.parallelism,我們將使用SparkContext defaultParallelism的值作為默認分區號,否則我們將使用上游分區的最大數量。

HashPartition

HashPartitioner是Spark的默認分區程序。 如果我們沒有配置任何分區器,那麼Spark將使用這個散列分區器來對數據進去分區。

注意 Java數組具有基於數組的身份而不是其內容的hashCode,因此嘗試使用HashPartitioner對RDD [Array []]或RDD[(Array [],_)]進行分區將產生意外或不正確的結果。

HashPartitioner源碼,位於org.apache.spark.Partitioner

HashPartitioner增加另一個名為partitions的構造器參數作為分區數,重寫的numPartitions方法只返回partitions。重寫的getPartition方法實際以key的hashCode和numPartitions作為參數調用Utils的nonNegativeMod方法。nonNegativeMod方法將對key的hashCode和numPartitions進行驅魔運算,得到key對應分區索引。

使用哈希和取模的方式,可以方便地計算出下游RDD的各個分區將具體處理那些key。由於上游RDD所處理的key的哈希值在取模後很可能產生數據傾斜,所以HashPartitioner並不是一個均衡的分區計算器。

RangePartitioner

從HashPartitioner分區的實現原理我們可以看出,其結果可能導致每個分區中數據量的不均勻,極端情況下會導致某些分區擁有RDD的全部數據,這顯然不是我們需要的。而RangePartitioner分區則儘量保證每個分區中數據量的均勻,而且分區與分區之間是有序的,也就是說一個分區中的元素肯定都是比另一個分區內的元素小或者大;但是分區內的元素是不能保證順序的。簡單的說就是將一定範圍內的數映射到某一個分區內。

RangePartitioner分區器的主要作用就是將一定範圍內的數映射到某一個分區內,所以它的實現中分界的算法尤為重要。該分區器的實現方式主要是通過兩個步驟來實現的,

  • 第一步:先重整個RDD中抽取出樣本數據,將樣本數據排序,計算出每個分區的最大key值,形成一個Array[KEY]類型的數組變量rangeBounds;
  • 第二步:判斷key在rangeBounds中所處的範圍,給出該key值在下一個RDD中的分區id下標;該分區器要求RDD中的KEY類型必須是可以排序的

RangePartitioner源碼,位於org.apache.spark.Partitioner

其實RangePartitioner的重點是在於構建rangeBounds數組對象,主要步驟是:

  1. 如果分區數量小於1或者rdd中不存在數據的情況下,直接返回一個空的數組,不需要計算range的邊界;如果分區數據大於1的情況下,而且rdd中有數據的情況下,才需要計算數組對象
  1. 計算總體的數據抽樣大小sampleSize,計算規則是:至少每個分區抽取20個數據或者最多1M的數據量
  1. 根據sampleSize和分區數量計算每個分區的數據抽樣樣本數量sampleSizePrePartition
  1. 調用RangePartitioner的sketch函數進行數據抽樣,計算出每個分區的樣本
  1. 計算樣本的整體佔比以及數據量過多的數據分區,防止數據傾斜
  1. 對於數據量比較多的RDD分區調用RDD的sample函數API重新進行數據抽取
  1. 將最終的樣本數據通過RangePartitoner的determineBounds函數進行數據排序分配,計算出rangeBounds

RangePartitioner的sketch函數的作用是對RDD中的數據按照需要的樣本數據量進行數據抽取,主要調用SamplingUtils類的reservoirSampleAndCount方法對每個分區進行數據抽取,抽取後計算出整體所有分區的數據量大小;reservoirSampleAndCount方法的抽取方式是先從迭代器中獲取樣本數量個數據(順序獲取), 然後對剩餘的數據進行判斷,替換之前的樣本數據,最終達到數據抽樣的效果。

RangePartitioner的determineBounds函數的作用是根據樣本數據記憶權重大小確定數據邊界, 源碼:

一般而已,使用默認的HashPartitioner即可,RangePartitioner的使用有一定的侷限性


分享到:


相關文章: