spark——rdd常用的轉化和行動操作

今天是spark第三篇文章,我們繼續來看RDD的一些操作。


我們前文說道在spark當中RDD的操作可以分為兩種,一種是轉化操作(transformation),另一種是行動操作(action)。在轉化操作當中,spark不會為我們計算結果,而是會生成一個新的RDD節點,記錄下這個操作。只有在行動操作執行的時候,spark才會從頭開始計算整個計算。


而轉化操作又可以進一步分為針對元素的轉化操作以及針對集合的轉化操作。


針對元素的轉化操作


針對元素的轉化操作非常常用,其中最常用的就是map和flatmap。從名字上看這兩者都是map操作,map操作我們都知道,在之前的MapReduce文章以及Python map、reduce用法的文章當中都有提及。簡而言之就是可以將一個操作映射在每一個元素上。


比如假設我們有一個序列[1, 3, 4, 7],我們希望將當中每一個元素執行平方操作。我們當然可以用for循環執行,但是在spark當中更好的辦法是使用map。

<code>nums = sc.parallelize([1, 3, 4, 7])
spuare = nums.map(lambda x: x * x)/<code>


我們知道map是一個轉化操作,所以square仍然是一個RDD,我們直接將它輸出不會得到結果,只會得到RDD的相關信息:

spark——rdd常用的轉化和行動操作


內部RDD的轉化圖是這樣的:

spark——rdd常用的轉化和行動操作


我們想看結果就必須要執行行動操作,比如take,我們take一下查看一下結果:

spark——rdd常用的轉化和行動操作

和我們的預期一致,對於之前一直關注的同學來說map操作應該已經很熟悉了,那麼這個flatmap又是什麼呢?


差別就在這個flat,我們都知道flat是扁平的意思,所以flatmap就是說map執行之後的結果扁平化。說白了也就是說如果map執行之後的結果是一個數組的話,那麼會將數組拆開,把裡面的內容拿出來組合到一起。


我們一起來看一個例子:

<code>texts = sc.parallelize(['now test', 'spark rdd'])
split = texts.map(lambda x: x.split(' '))/<code>


由於我們執行map的對象是一個字符串,一個字符串執行split操作之後會得到一個字符串數組。如果我們執行map,得到的結果會是:

spark——rdd常用的轉化和行動操作


如果我們執行flatmap呢?我們也可以試一下:

spark——rdd常用的轉化和行動操作

對比一下,有沒有注意到差別?


是了,map執行的結果是一個array的array,因為每一個string split之後就是一個array,我們把array拼接到一起自然是一個array的array。而flatMap會把這些array攤平之後放在一起,這也是兩者最大的差別。


針對集合的轉化操作


上面介紹了針對元素的轉化操作,下面來看看針對集合的轉化操作。


針對集合的操作大概有union,distinct,intersection和subtract這幾種。我們可以先看下下圖有一個直觀的感受,之後我們再一一分析:

spark——rdd常用的轉化和行動操作

首先來看distinct,這個顧名思義,就是去除重複。和SQL當中的distinct是一樣的,這個操作的輸入是兩個集合RDD,執行之後會生成一個新的RDD,這個RDD當中的所有元素都是unique的。有一點需要注意,執行distinct的開銷很大,因為它會執行shuffle操作將所有的數據進行亂序,以確保每個元素只有一份。如果你不明白shuffle操作是什麼意思,沒有關係,我們在後序的文章當中會著重講解。只需要記住它的開銷很大就行了。


第二種操作是union,這個也很好理解,就是把兩個RDD當中的所有元素合併。你可以把它當成是Python list當中的extend操作,同樣和extend一樣,它並不會做重複元素的檢測,所以如果合併的兩個集合當中有相同的元素並不會被過濾,而是會被保留。


第三個操作是intersection,它的意思是交集,也就是兩個集合重疊的部分。這個應該蠻好理解的,我們看下下圖:

spark——rdd常用的轉化和行動操作

下圖當中藍色的部分,也就是A和B兩個集合的交集部分就是A.intersection(B)的結果,也就是兩個集合當中共有的元素。同樣,這個操作也會執行shuffle,所以開銷一樣很大,並且這個操作會去掉重複的元素。


最後一個是subtract,也就是差集,就是屬於A不屬於B的元素,同樣我們可以用圖來表示:

spark——rdd常用的轉化和行動操作

上圖當中灰色陰影部分就是A和B兩個集合的差集,同樣,這個操作也會執行shuffle,非常耗時。


除了以上幾種之外,還有cartesian,即笛卡爾積,sample抽樣等集合操作,不過相對而言用的稍微少一些,這裡就不過多介紹了,感興趣的同學可以瞭解一下,也並不複雜。


行動操作


RDD中最常用的行動操作應該就是獲取結果的操作了,畢竟我們算了半天就是為了拿結果,只獲取RDD顯然不是我們的目的。獲取結果的RDD主要是take,top和collect,這三種沒什麼特別的用法,簡單介紹一下。


其中collect是獲取所有結果,會返回所有的元素。take和top都需要傳入一個參數指定條數,take是從RDD中返回指定條數的結果,top是從RDD中返回最前面的若干條結果,top和take的用法完全一樣,唯一的區別就是拿到的結果是否是最前面的。


除了這幾個之外,還有一個很常用的action是count,這個應該也不用多說,計算數據條數的操作,count一下就可以知道有多少條數據了。


reduce


除了這些比較簡單的之外,再介紹另外兩個比較有意思的,首先,先來介紹reduce。reduce顧名思義就是MapReduce當中的reduce,它的用法和Python當中的reduce幾乎完全一樣,它接受一個函數來進行合併操作。我們來看個例子:

spark——rdd常用的轉化和行動操作


在這個例子當中,我們的reduce函數是將兩個int執行加和,reduce機制會重複執行這個操作將所有的數據合併,所以最終得到的結果就是1 + 3 + 4 + 7 = 15.


fold


除了reduce之外還有一個叫做fold的action,它和reduce完全一樣,唯一不同的是它可以自定義一個初始值,並且是針對分區的,我們還拿上面的例子舉例:

spark——rdd常用的轉化和行動操作

直接看這個例子可能有點懵逼,簡單解釋一下就明白了,其實不復雜。我們注意到我們在使用parallelize創造數據的時候多加了一個參數2,這個

2表示分區數。簡單可以理解成數組[1, 3, 4, 7]會被分成兩部分,但是我們直接collect的話還是原值。


現在我們使用fold,傳入了兩個參數,除了一個函數之外還傳入了一個初始值2。所以整個計算過程是這樣的:


對於第一個分區的答案是1 + 3 + 2 = 6,對於第二個分區的答案是4 + 7 + 2 = 13,最後將兩個分區合併:6 + 13 + 2 = 21。


也就是說我們對於每個分區的結果賦予了一個起始值,並且對分區合併之後的結果又賦予了一個起始值。


aggregate


老實講這個action是最難理解的,因為它比較反常。首先,對於reduce和fold來說都有一個要求就是返回值的類型必須和rdd的數據類型相同。比如數據的類型是int,那麼返回的結果也要是int。


但是對於有些場景這個是不適用的,比如我們想求平均,我們需要知道term的和,也需要知道term出現的次數,所以我們需要返回兩個值。這個時候我們初始化的值應該是0, 0,也就是對於加和與計數而言都是從0開始的,接著我們需要傳入兩個函數,比如寫成這樣:

<code>nums.aggregate((0, 0), lambda x, y: (x[0] + y, x[1] + 1), lambda x, y: (x[0] + y[0], x[1] + y[1]))/<code>


看到這行代碼會懵逼是必然的,不用擔心,我們一點一點解釋。


首先是第一個lambda函數,這裡的x不是一個值而是兩個值,或者說是一個二元組,也就是我們最後返回的結果,在我們的返回預期裡,第一個返回的數是nums的和,第二個返回的數是nums當中數的個數。而這裡的y則是nums輸入的結果,顯然nums輸入的結果只有一個int,所以這裡的y是一維的。那麼我們要求和當然是用x[0] + y,也就是說把y的值加在第一維上,第二維自然是加一,因為我們每讀取一個數就應該加一。


這點還比較容易理解,第二個函數可能有些費勁,第二個函數和第一個不同,它不是用在處理nums的數據的,而是用來處理分區的。當我們執行aggregate的時候,spark並不是單線程執行的,它會將nums中的數據拆分成許多分區,每個分區得到結果之後需要合併,合併的時候會調用這個函數。


和第一個函數類似,第一個x是最終結果,而y則是其他分區運算結束需要合併進來的值。所以這裡的y是二維的,第一維是某個分區的和,第二維是某個分區當中元素的數量,那麼我們當然要把它都加在x上。

spark——rdd常用的轉化和行動操作

上圖展示了兩個分區的時候的計算過程,其中lambda1就是我們傳入的第一個匿名函數,同理,lambda2就是我們傳入的第二個匿名函數。我想結合圖應該很容易看明白。


行動操作除了這幾個之外還有一些,由於篇幅原因我們先不贅述了,在後序的文章當中如果有出現,我們會再進行詳細解釋的。初學者學習spark比較抗拒的一個主要原因就是覺得太過複雜,就連操作還區分什麼轉化操作和行動操作。其實這一切都是為了惰性求值從而優化性能。這樣我們就可以把若干個操作合併在一起執行,從而減少消耗的計算資源,對於分佈式計算框架而言,性能是非常重要的指標,理解了這一點,spark為什麼會做出這樣的設計也就很容易理解了。


不僅spark如此,TensorFlow等深度學習框架也是如此,本質上許多看似反直覺的設計都是有更深層的原因的,理解了之後其實也很容易猜到,凡是拿到最終結果的操作往往都是行動操作,如果只是一些計算,那麼十有八九是轉化操作。


持久化操作


Spark當中的RDD是惰性求值的,有的時候我們會希望多次使用同一個RDD。如果我們只是簡單地調用行動操作,那麼spark會多次重複計算RDD和它對應的所有數據以及其他依賴,這顯然會帶來大量開銷。我們很自然地會希望對於我們經常使用的RDD可以緩存起來,在我們需要的時候隨時拿來用,而不是每次用到的時候都需要重新跑。


為了解決這個問題,spark當中提供了持久化的操作。所謂的持久化可以簡單理解成緩存起來。用法也很簡單,我們只需要對RDD進行persist即可:

<code>texts = sc.parallelize(['now test', 'hello world'])
split = texts.split(lambda x: x.split(' '))
split.persist()/<code>


調用完持久化之後,RDD會被緩存進內存或磁盤當中,我們需要的時候可以隨時調出來使用,就不用把前面的整個流程全部跑一遍了。並且spark當中支持多種級別的持久化操作,我們可以通過

StorageLevel的變量來控制。我們來看下這個StorageLevel的取值:

spark——rdd常用的轉化和行動操作

我們根據需要選擇對應的緩存級別即可。當然既然有持久化自然就有反持久化,對於一些已經不再需要緩存的RDD,我們可以調用unpersist將它們從緩存當中去除。


今天的內容雖然看起來各種操作五花八門,但是有些並不是經常用到,我們只需要大概有個印象,具體操作的細節可以等用到的時候再做仔細的研究。希望大家都能

忽略這些並不重要的細節,抓住核心的本質。


今天的文章就是這些,如果覺得有所收穫,請順手點個關注或者轉發吧,你們的舉手之勞對我來說很重要。


分享到:


相關文章: