spark中的pair rdd,看這一篇就夠了

今天是spark專題的第四篇文章,我們一起來看下Pair RDD。


定義


在之前的文章當中,我們已經熟悉了RDD的相關概念,也瞭解了RDD基本的轉化操作和行動操作。今天我們來看一下RDD當中非常常見的PairRDD,也叫做鍵值對RDD,可以理解成KVRDD。


KV很好理解,就是key和value的組合,比如Python當中的dict或者是C++以及Java當中的map中的基本元素都是鍵值對。相比於之前基本的RDD,pariRDD可以支持更多的操作,相對來說更加靈活,可以完成更加複雜的功能。比如我們可以根據key進行聚合,或者是計算交集等。


所以本身pairRDD只不過是數據類型是KV結構的RDD而已,並沒有太多的內涵,大家不需要擔心。


Pair RDD轉化操作


Pair RDD也是RDD,所以之前介紹的RDD的轉化操作Pair RDD自然也可以使用。它們兩者有些像是類繼承的關係,RDD是父類,Pair RDD是實現了一些新特性的子類。子類可以調用父類當中所有的方法,但是父類卻不能調用子類中的方法。


調用的時候需要注意,由於我們的Pair RDD中的數據格式是KV的二元組,所以我們傳入的函數必須是針對二元組數據的,不然的話可能運算的結果會有問題。下面我們來列舉一些最常用的轉化操作。


為了方便演示,我們用一個固定的RDD來運行各種轉化操作,來直觀瞭解一下這些轉化操作究竟起什麼樣的作用。

<code>ex1 = sc.parallelize([[1, 2], [3, 4], [3, 5]])/<code>


keys,values和sortByKey


這三個轉化操作應該是最常用也是最簡單的,簡單到我們通過字面意思就可以猜出它們的意思。

我們先來看keys和values:

spark中的pair rdd,看這一篇就夠了

我們的RDD當中二元組當中的第一個元素會被當做key,第二個元素當做value,需要注意的是,它並不是一個map或者是dict,所以key和value都是可以重複的


sortByKey也很直觀,我們從字面意思就看得出來是對RDD當中的數據根據key值進行排序,同樣,我們也來看下結果:

spark中的pair rdd,看這一篇就夠了


mapValues和flatMapValues


mapValues不能直接使用,而必須要傳入一個函數作為參數。它的意思是對所有的value執行這個函數,比如我們想把所有的value全部轉變成字符串,我們可以這麼操作:

spark中的pair rdd,看這一篇就夠了

flatMapValues的操作和我們的認知有些相反,我們都知道flatMap操作是可以將一個嵌套的數組打散,但是我們怎麼對一個value打散嵌套呢?畢竟我們的value不一定就是一個數組,這就要說到我們傳入的函數了,這個flatMap的操作其實是針對函數返回的結果的,也就是說函數會返回一個迭代器,然後打散的內容其實是這個迭代器當中的值。


我這麼表述可能有些枯燥,我們來看一個例子就明白了:

spark中的pair rdd,看這一篇就夠了

不知道這個結果有沒有出乎大家的意料,它的整個流程是這樣的,我們調用flatMapValues運算之後返回一個迭代器,迭代器的內容是range(x, x+3)。其實是每一個key對應一個這樣的迭代器,之後再將迭代器當中的內容打散,和key構成新的pair。


groupByKey,reduceByKey和foldByKey


這兩個功能也比較接近,我們先說第一個,如果學過SQL的同學對於group by操作的含義應該非常熟悉。如果沒有了解過也沒有關係,group by可以簡單理解成歸併或者是分桶。也就是說將key值相同的value歸併到一起,得到的結果是key-list的Pair RDD,也就是我們把key值相同的value放在了一個list當中。


我們也來看下例子:

spark中的pair rdd,看這一篇就夠了

我們調用完groupby之後得到的結果是一個對象,所以需要調用一下mapValues將它轉成list才可以使用,否則的話是不能使用collect獲取的。


reduceByKey和groupByKey類似,只不過groupByKey只是歸併到一起,然而reduceByKey是傳入reduce函數,執行reduce之後的結果。我們來看一個例子:

spark中的pair rdd,看這一篇就夠了

在這個例子當中我們執行了累加,把key值相同的value加在了一起。


foldByKey和fold的用法差別並不大,唯一不同的是我們加上了根據key值聚合的邏輯。如果我們把分區的初始值設置成0的話,那麼它用起來和reduceByKey幾乎沒有區別:

spark中的pair rdd,看這一篇就夠了

我們只需要清楚foldByKey當中的初始值針對的是分區即可。


combineByKey


這個也是一個很核心並且不太容易理解的轉化操作,我們先來看它的參數,它一共接受5個參數。我們一個一個來說,首先是第一個參數,是createCombiner


它的作用是初始化,將value根據我們的需要做初始化,比如將string類型的轉化成int,或者是其他的操作。我們用記號可以寫成是V => C,這裡的V就是value,C是我們初始化之後的新值。


它會和value一起被當成新的pair傳入第二個函數,所以第二個函數的接受參數是(C, V)的二元組。我們要做的是定義這個二元組的合併,所以第二個函數可以寫成(C, V) => C。源碼裡的註釋和網上的教程都是這麼寫的,但我覺得由於出現了兩個C,可能會讓人難以理解,我覺得可以寫成(C, V) => D,比較好。


最後一個函數是將D進行合併,所以它可以寫成是(D, D) => D。


到這裡我們看似好像明白了它的原理,但是又好像有很多問號,總覺得哪裡有些不太對勁。我想了很久,才找到了問題的根源,出在哪裡呢,在於合併。有沒有發現第二個函數和第三個函數都是用來合併的,為什麼我們要合併兩次,它們之間的區別是什麼?如果這個問題沒搞明白,那麼對於它的使用一定是錯誤的,我個人覺得這個問題才是這個轉化操作的核心,沒講清楚這個問題的博客都是不夠清楚的。


其實這兩次合併的邏輯大同小異,但是合併的範圍不一樣,第一次合併是針對分區的,第二次合併是針對key的。因為在spark當中數據可能不止存放在一個分區內,所以我們要合併兩次,第一次先將分區內部的數據整合在一起,第二次再跨分區合併。由於不同分區的數據可能相隔很遠,所以會導致網絡傳輸的時間過長,所以我們希望傳輸的數據儘量小,這才有了groupby兩次的原因。


我們再來看一個例子:

spark中的pair rdd,看這一篇就夠了

在這個例子當中我們計算了每個單詞出現的平均個數,我們一點一點來看。首先,我們第一個函數將value轉化成了(1, value)的元組,元組的第0號元素表示出現該單詞的文檔數,第1號元素表示文檔內出現的次數。所以第二個函數,也就是在分組內聚合的函數,我們對於出現的文檔數只需要加一即可,對於出現的次數要進行累加。因為這一次聚合的對象都是(1, value)類型的元素,也就是沒有聚合之前的結果。


在第三個函數當中,我們對於出現的總數也進行累加,是因為這一個函數處理的結果是各個分區已經聚合一次的結果了。比如apple在一個分區內出現在了兩個文檔內,一共出現了20次,在一個分區出現在了三個文檔中,一共出現了30次,那麼顯然我們一共出現在了5個文檔中,一共出現了50次。


由於我們要計算平均,所以我們要用出現的總次數除以出現的文檔數。最後經過map之後由於我們得到的還是一個二元組,我們不能直接collect,需要用collectAsMap。


我們把上面這個例子用圖來展示,會很容易理解:

spark中的pair rdd,看這一篇就夠了


連接操作


在spark當中,除了基礎的轉化操作之外,spark還提供了額外的連接操作給pair RDD。通過連接,我們可以很方便地像是操作集合一樣操作RDD。操作的方法也非常簡單,和SQL當中操作數據表的形式很像,就是join操作。join操作又可以分為join(inner join)、left join和right join。


如果你熟悉SQL的話,想必這三者的區別應該非常清楚,它和SQL當中的join是一樣的。如果不熟悉也沒有關係,解釋起來並不複雜。在join的時候我們往往是用一張表去join另外一張表,就好像兩個數相減,我們用一個數減去另外一個數一樣。比如A.join(B),我們把A叫做左表,B叫做右表。所謂的join,就是把兩張表當中某一個字段或者是某些字段值相同的行連接在一起。


比如一張表是學生表,一張表是出勤表。我們兩張表用學生的id一關聯,就得到了學生的出勤記錄。但是既然是集合關聯,

就會出現數據關聯不上的情況。比如某個學生沒有出勤,或者是出勤表裡記錯了學生id。對於數據關聯不上的情況,我們的處理方式有四種。第一種是全都丟棄,關聯不上的數據就不要了。第二種是全部保留,關聯不上的字段就記為NULL。第三種是左表關聯不上的保留,右表丟棄。第四種是右表保留,左表丟棄。


下圖展示了這四種join,非常形象。

spark中的pair rdd,看這一篇就夠了

我們看幾個實際的例子來體會一下。


首先創建數據集:

<code>ex1 = sc.parallelize([['frank', 30], ['bob', 9], ['silly', 3]])
ex2 = sc.parallelize([['frank', 80], ['bob', 12], ['marry', 22], ['frank', 21], ['bob', 22]])/<code>


接著,我們分別運行這四種join,觀察一下join之後的結果。

spark中的pair rdd,看這一篇就夠了

從結果當中我們可以看到,如果兩個數據集當中都存在多條key值相同的數據,spark會將它們兩兩相乘匹配在一起。


行動操作


最後,我們看下pair RDD的行動操作。pair RDD同樣是rdd,所以普通rdd適用的行動操作,同樣適用於pair rdd。但是除此之外,spark還為它開發了獨有的行動操作。


countByKey


countByKey這個操作顧名思義就是根據Key值計算每個Key值出現的條數,它等價於count groupby的SQL語句。我們來看個具體的例子:

spark中的pair rdd,看這一篇就夠了

collectAsMap


這個也很好理解,其實就是將最後的結果以map的形式輸出

spark中的pair rdd,看這一篇就夠了

從返回的結果可以看到,輸出的是一個dict類型。也就是Python當中的"map"。


lookup


這個單詞看起來比較少見,其實它代表的是根據key值查找對應的value的意思。也就是常用的get函數,我們傳入一個key值,會自動返回key值對應的所有的value。如果有多個value,則會返回list。

spark中的pair rdd,看這一篇就夠了


總結


到這裡,所有的pair RDD相關的操作就算是介紹完了。pair rdd在我們日常的使用當中出現的頻率非常高,利用它可以非常方便地實現一些比較複雜的操作。


另外,今天的這篇文章內容不少,想要完全吃透,需要一點功夫。這不是看一篇文章就可以實現的,但是也沒有關係,我們初學的時候只需要對這些api和使用方法有一個大概的印象即可,具體的使用細節可以等用到的時候再去查閱相關的資料。


今天的文章就是這些,如果覺得有所收穫,請順手點個關注或者轉發吧,你們的舉手之勞對我來說很重要。


分享到:


相關文章: