前言
Spark是一種大規模、快速計算的集群平臺,本頭條號試圖通過學習Spark官網的實戰演練筆記提升筆者實操能力以及展現Spark的精彩之處。有關框架介紹和環境配置可以參考以下內容:
本文的參考配置為:Deepin 15.11、Java 1.8.0_241、Hadoop 2.10.0、Spark 2.4.4、scala 2.11.12
一、彈性分佈式數據集(RDDs)
Spark 主要以彈性分佈式數據集(RDD)的概念為中心,它是一個容錯且可以執行並行操作的元素的集合。有兩種方法可以創建 RDD:在你的 driver program(驅動程序)中 parallelizing 一個已存在的集合,或者在外部存儲系統中引用一個數據集,例如,一個共享文件系統,HDFS,HBase,或者提供 Hadoop InputFormat 的任何數據源。
1.並行集合
例如在已存在的集合上通過調用 SparkContext 的 parallelize 方法來創建並行集合。
2.外部數據源
Spark 可以從 Hadoop 所支持的任何存儲源中創建 distributed dataset(分佈式數據集),包括
本地文件系統,HDFS,Cassandra,HBase,Amazon S3 等等。Spark 支持文本文件,SequenceFiles,以及任何其它的 Hadoop InputFormat。3.RDD操作
RDDs support 兩種類型的操作:transformations(轉換),它會在一個已存在的 dataset 上創建一個新的 dataset,和 actions(動作),將在 dataset 上運行的計算後返回到 driver 程序。
我們還可以傳遞函數給Spark。當 driver 程序在集群上運行時,Spark 的 API 在很大程度上依賴於傳遞函數。可以使用Anonymous function syntax(匿名函數語法),它可以用於短的代碼片斷.
大多數 Spark 操作工作在包含任何類型對象的 RDDs 上,只有少數特殊的操作可用於 Key-Value 對的 RDDs。最常見的是分佈式 “shuffle” 操作,如通過元素的 key 來進行 grouping 或 aggregating 操作.
3.RDD轉換操作
- map是對RDD中的每個元素都執行一個指定的函數來產生一個新的RDD;
- Filter是對RDD元素進行過濾;返回一個新的數據集,是經過func函數後返回值為true的原元素組成
- flatMap類似於map,但是每一個輸入元素,會被映射為0到多個輸出元素
- mapPartitions是map的一個變種。map的輸入函數是應用於RDD中每個元素,而mapPartitions的輸入函數是每個分區的數據,也就是把每個分區中的內容作為整體來處理的
- mapPartitionsWithSplit與mapPartitions的功能類似, 只是多傳入split index而已,所有func 函數必需是 (Int, Iterator
) => Iterator 類型 - sample(withReplacement,fraction,seed)是根據給定的隨機種子seed,隨機抽樣出數量為frac的數據。withReplacement:是否放回抽樣;fraction:比例,0.1表示10% ;
- union(otherDataset)是數據合併,返回一個新的數據集,由原數據集和otherDataset聯合而成
- intersection(otherDataset)是數據交集,返回一個新的數據集,包含兩個數據集的交集數據;
- distinct([numTasks]))是數據去重,返回一個數據集,是對兩個數據集去除重複數據,numTasks參數是設置任務並行數量
- groupByKey([numTasks])是數據分組操作,在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。
- reduceByKey(func, [numTasks])是數據分組聚合操作,在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集,key相同的值,都被使用指定的reduce函數聚合到一起。
- aggreateByKey(zeroValue: U)(seqOp: (U, T)=> U, combOp: (U, U) =>U) 和reduceByKey的不同在於,reduceByKey輸入輸出都是(K, V),而aggreateByKey輸出是(K,U),aggreateByKey可以看成更高抽象的,更靈活的reduce或group
- combineByKey是對RDD中的數據集按照Key進行聚合操作。
- sortByKey([ascending],[numTasks])是排序操作,對(K,V)類型的數據按照K進行排序,其中K需要實現Ordered方法
- join(otherDataset, [numTasks])是連接操作,將輸入數據集(K,V)和另外一個數據集(K,W)進行Join, 得到(K, (V,W));
- cogroup(otherDataset, [numTasks])是將輸入數據集(K, V)和另外一個數據集(K, W)進行cogroup,得到一個格式為(K, Seq[V], Seq[W])的數據集
- cartesian(otherDataset)是做笛卡爾積:對於數據集T和U 進行笛卡爾積操作, 得到(T, U)格式的數據集
4.RDD行動操作
- reduce(func)是對數據集的所有元素執行聚集(func)函數,該函數必須是可交換的。
- collect是將數據集中的所有元素以一個array的形式返回
- count返回數據集中元素的個數。
- first返回數據集中的第一個元素, 類似於take(1)
- Take(n)返回一個包含數據集中前n個元素的數組
- takeSample(withReplacement,num, [seed])返回包含隨機的num個元素的數組
- takeOrdered(n, [ordering])是返回包含隨機的n個元素的數組,按照順序輸出
- saveAsTextFile把數據集中的元素寫到一個文本文件,Spark會對每個元素調用toString方法來把每個元素存成文本文件的一行。
- countByKey對於(K, V)類型的RDD. 返回一個(K, Int)的map, Int為K的個數
- foreach(func)是對數據集中的每個元素都執行func函數
5.持久化
Spark 中一個很重要的能力是將數據 persisting 持久化(或稱為 caching 緩存),在多個操作間都可以訪問這些持久化的數據。RDD 可以使用 persist() 方法或 cache() 方法進行持久化。
二、共享變量
通常情況下,一個傳遞給 Spark 操作(例如 map 或 reduce)的函數 func 是在遠程的集群節點上執行的。該函數 func 在多個節點執行過程中使用的變量,是同一個變量的多個副本。這些變量的以副本的方式拷貝到每個機器上,並且各個遠程機器上變量的更新並不會傳播回 driver program(驅動程序)。通用且支持 read-write(讀-寫)的共享變量在任務間是不能勝任的。所以,Spark 提供了兩種特定類型的共享變量:broadcast variables(廣播變量)和 accumulators(累加器)。
1.廣播變量
Broadcast variables(廣播變量)允許程序員將一個 read-only(只讀的)變量緩存到每臺機器上,而不是給任務傳遞一個副本。廣播變量通過在一個變量 v 上調用 SparkContext.broadcast(v) 方法來進行創建。廣播變量是 v 的一個 wrapper(包裝器),可以通過調用 value 方法來訪問它的值。
在創建廣播變量之後,在集群上執行的所有的函數中,應該使用該廣播變量代替原來的 v 值,所以節點上的 v 最多分發一次。另外,對象 v 在廣播後不應該再被修改,以保證分發到所有的節點上的廣播變量具有同樣的值(例如,如果以後該變量會被運到一個新的節點)。
2.累加器
Accumulators(累加器)是一個僅可以執行 “added”(添加)的變量來通過一個關聯和交換操作,因此可以高效地執行支持並行。累加器可以用於實現 counter(計數,類似在 MapReduce 中那樣)或者 sums(求和)。原生 Spark 支持數值型的累加器,並且程序員可以添加新的支持類型。
可以通過調用 sc.longAccumulator() 或sc.doubleAccumulator() 方法創建數值類型的 accumulator(累加器)以分別累加 Long 或 Double 類型的值。集群上正在運行的任務就可以使用 add 方法來累計數值。然而,它們不能夠讀取它的值。只有 driver program(驅動程序)才可以使用 value 方法讀取累加器的值。
RDD和共享變量的內容至此結束,下文將進一步對Spark SQL 、DataFrame、DataSets的內容做詳細介紹。前文筆記請參考下面的鏈接:
閱讀更多 雲飛大數據 的文章