五分鐘搞懂spark中RDD

今天是spark專題第二篇文章,我們來看spark非常重要的一個概念——RDD。


在上一講當中我們在本地安裝好了spark,雖然我們只有local一個集群,但是仍然不妨礙我們進行實驗。spark最大的特點就是無論集群的資源如何,進行計算的代碼都是一樣的,spark會自動為我們做分佈式調度工作


RDD概念


介紹spark離不開RDD,RDD是其中很重要的一個部分。但是很多初學者往往都不清楚RDD究竟是什麼,我自己也是一樣,我在系統學習spark之前代碼寫了一堆,但是對於RDD等概念仍然雲裡霧裡。


RDD的英文全名是Resilient Distributed Dataset,我把英文寫出來就清楚了很多。即使第一個單詞不認識,至少也可以知道它是一個分佈式的數據集。第一個單詞是彈性的意思,所以直譯就是彈性分佈式數據集。雖然我們還是不夠清楚,但是已經比只知道RDD這個概念清楚多了,


RDD是一個不可變的分佈式對象集合,每個RDD都被分為多個分區,這些分區運行在集群的不同節點上。


很多資料裡只有這麼一句粗淺的解釋,看起來說了很多,但是我們都get不到。細想有很多疑問,最後我在大神的博客裡找到了詳細的解釋,這位大神翻了spark的源碼,找到了其中RDD的定義,一個RDD當中包含以下內容:


  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)


我們一條一條來看:


  1. 它是一組分區,分區是spark中數據集的最小單位。也就是說spark當中數據是以分區為單位存儲的,不同的分區被存儲在不同的節點上。這也是分佈式計算的基礎。
  2. 一個應用在各個分區上的計算任務。在spark當中數據和執行的操作是分開的,並且spark基於懶計算的機制,也就是在真正觸發計算的行動操作出現之前,spark會存儲起來對哪些數據執行哪些計算。數據和計算之間的映射關係就存儲在RDD中。
  3. RDD之間的依賴關係,RDD之間存在轉化關係,一個RDD可以通過轉化操作轉化成其他RDD,這些轉化操作都會被記錄下來。當部分數據丟失的時候,spark可以通過記錄的依賴關係重新計算丟失部分的數據,而不是重新計算所有數據。
  4. 一個分區的方法,也就是計算分區的函數。spark當中支持基於hash的hash分區方法和基於範圍的range分區方法。
  5. 一個列表,存儲的是存儲每個分區的優先存儲的位置。


通過以上五點,我們可以看出spark一個重要的理念。即移動數據不如移動計算,也就是說在spark運行調度的時候,會傾向於將計算分發到節點,而不是將節點的數據蒐集起來計算。RDD正是基於這一理念而生的,它做的也正是這樣的事情。


創建RDD


spark中提供了兩種方式來創建RDD,一種是讀取外部的數據集,另一種是將一個已經存儲在內存當中的集合進行並行化


我們一個一個來看,最簡單的方式當然是並行化,因為這不需要外部的數據集,可以很輕易地做到。


在此之前,我們先來看一下SparkContext的概念,SparkContext是整個spark的入口,相當於程序的main函數。在我們啟動spark的時候,spark已經為我們創建好了一個SparkContext的實例,命名為sc,我們可以直接訪問到。

五分鐘搞懂spark中RDD


我們要創建RDD也需要基於sc進行,比如下面我要創建一個有字符串構成的RDD:

<code>texts = sc.parallelize(['now test', 'spark rdd'])/<code>


返回的texts就是一個RDD:

五分鐘搞懂spark中RDD


除了parallelize之外呢,我們還可以從外部數據生成RDD,比如我想從一個文件讀入,可以使用sc當中的textFile方法獲取:

<code>text = sc.textFile('/path/path/data.txt')/<code>


一般來說,除了本地調試我們很少會用parallelize進行創建RDD,因為這需要我們先把數據讀取在內存。由於內存的限制,使得我們很難將spark的能力發揮出來。


轉化操作和行動操作


剛才我們在介紹RDD的時候其實提到過,RDD支持兩種操作,一種叫做轉化操作(transformation)一種叫做行動操作(action)。


顧名思義,執行轉化操作的時候,spark會

將一個RDD轉化成另一個RDD。RDD中會將我們這次轉化的內容記錄下來,但是不會進行運算。所以我們得到的仍然是一個RDD而不是執行的結果。


比如我們創建了texts的RDD之後,我們想要對其中的內容進行過濾,只保留長度超過8的,我們可以用filter進行轉化:

<code>textAfterFilter = texts.filter(lambda x: len(x) > 8)/<code>


我們調用之後得到的也是一個RDD,就像我們剛才說的一樣,由於filter是一個轉化操作,所以spark只會記錄下它的內容,並不會真正執行。


轉化操作可以操作任意數量的RDD,比如如果我執行如下操作,會一共得到4個RDD:

<code>inputRDD = sc.textFile('path/path/log.txt')
lengthRDD = inputRDD.filter(lambda x: len(x) > 10)
errorRDD = inputRDD.filter(lambda x: 'error' in x)
unionRDD = errorRDD.union(lengthRDD)/<code>


最後的union會將兩個RDD的結果組合在一起

,如果我們執行完上述代碼之後,spark會記錄下這些RDD的依賴信息,我們把這個依賴信息畫出來,就成了一張依賴圖:

五分鐘搞懂spark中RDD


無論我們執行多少次轉化操作,spark都不會真正執行其中的操作,只有當我們執行行動操作時,記錄下來的轉化操作才會真正投入運算。像是first(),take(),count()等都是行動操作,這時候spark就會給我們返回計算結果了。

五分鐘搞懂spark中RDD


其中first的用處是返回第一個結果,take需要傳入一個參數,指定返回的結果條數,count則是計算結果的數量。和我們逾期的一樣,當我們執行了這些操作之後,spark為我們返回了結果。


本文著重講的是RDD的概念,我們下篇文章還會著重對轉化操作和行動操作進行深入解讀。感興趣的同學不妨期待一下吧~


今天的文章就是這些,如果覺得有所收穫,請順手點個關注或者轉發吧,你們的舉手之勞對我來說很重要。


分享到:


相關文章: