學大數據一點也不難!一文帶你瞭解RDD與共享變量(附安裝教程)

​前言

Spark是一種大規模、快速計算的集群平臺,本頭條號試圖通過學習Spark官網的實戰演練筆記提升筆者實操能力以及展現Spark的精彩之處。有關框架介紹和環境配置可以參考以下內容:

本文的參考配置為:Deepin 15.11、Java 1.8.0_241、Hadoop 2.10.0、Spark 2.4.4、scala 2.11.12

一、彈性分佈式數據集(RDDs)

Spark 主要以彈性分佈式數據集(RDD)的概念為中心,它是一個容錯且可以執行並行操作的元素的集合。有兩種方法可以創建 RDD:在你的 driver program(驅動程序)中 parallelizing 一個已存在的集合,或者在外部存儲系統中引用一個數據集,例如,一個共享文件系統,HDFS,HBase,或者提供 Hadoop InputFormat 的任何數據源。

1.並行集合

例如在已存在的集合上通過調用 SparkContext parallelize 方法來創建並行集合。

2.外部數據源

Spark 可以從 Hadoop 所支持的任何存儲源中創建 distributed dataset(分佈式數據集),包括

本地文件系統HDFSCassandraHBaseAmazon S3 等等。Spark 支持文本文件,SequenceFiles,以及任何其它的 Hadoop InputFormat

3.RDD操作

RDDs support 兩種類型的操作:transformations(轉換),它會在一個已存在的 dataset 上創建一個新的 dataset,和 actions(動作),將在 dataset 上運行的計算後返回到 driver 程序。

我們還可以傳遞函數給Spark。當 driver 程序在集群上運行時,Spark 的 API 在很大程度上依賴於傳遞函數。可以使用Anonymous function syntax(匿名函數語法),它可以用於短的代碼片斷.

大多數 Spark 操作工作在包含任何類型對象的 RDDs 上,只有少數特殊的操作可用於 Key-Value 對的 RDDs。最常見的是分佈式 “shuffle” 操作,如通過元素的 key 來進行 grouping 或 aggregating 操作.

3.RDD轉換操作

  • map是對RDD中的每個元素都執行一個指定的函數來產生一個新的RDD;
  • Filter是對RDD元素進行過濾;返回一個新的數據集,是經過func函數後返回值為true的原元素組成
  • flatMap類似於map,但是每一個輸入元素,會被映射為0到多個輸出元素
  • mapPartitions是map的一個變種。map的輸入函數是應用於RDD中每個元素,而mapPartitions的輸入函數是每個分區的數據,也就是把每個分區中的內容作為整體來處理的
  • mapPartitionsWithSplit與mapPartitions的功能類似, 只是多傳入split index而已,所有func 函數必需是 (Int, Iterator) => Iterator 類型
  • sample(withReplacement,fraction,seed)是根據給定的隨機種子seed,隨機抽樣出數量為frac的數據。withReplacement:是否放回抽樣;fraction:比例,0.1表示10% ;
  • union(otherDataset)是數據合併,返回一個新的數據集,由原數據集和otherDataset聯合而成
  • intersection(otherDataset)是數據交集,返回一個新的數據集,包含兩個數據集的交集數據;
  • distinct([numTasks]))是數據去重,返回一個數據集,是對兩個數據集去除重複數據,numTasks參數是設置任務並行數量
  • groupByKey([numTasks])是數據分組操作,在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。
  • reduceByKey(func, [numTasks])是數據分組聚合操作,在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集,key相同的值,都被使用指定的reduce函數聚合到一起。
  • aggreateByKey(zeroValue: U)(seqOp: (U, T)=> U, combOp: (U, U) =>U) 和reduceByKey的不同在於,reduceByKey輸入輸出都是(K, V),而aggreateByKey輸出是(K,U),aggreateByKey可以看成更高抽象的,更靈活的reduce或group
  • combineByKey是對RDD中的數據集按照Key進行聚合操作。
  • sortByKey([ascending],[numTasks])是排序操作,對(K,V)類型的數據按照K進行排序,其中K需要實現Ordered方法
  • join(otherDataset, [numTasks])是連接操作,將輸入數據集(K,V)和另外一個數據集(K,W)進行Join, 得到(K, (V,W));
  • cogroup(otherDataset, [numTasks])是將輸入數據集(K, V)和另外一個數據集(K, W)進行cogroup,得到一個格式為(K, Seq[V], Seq[W])的數據集
  • cartesian(otherDataset)是做笛卡爾積:對於數據集T和U 進行笛卡爾積操作, 得到(T, U)格式的數據集

4.RDD行動操作

  • reduce(func)是對數據集的所有元素執行聚集(func)函數,該函數必須是可交換的。
  • collect是將數據集中的所有元素以一個array的形式返回
  • count返回數據集中元素的個數。
  • first返回數據集中的第一個元素, 類似於take(1)
  • Take(n)返回一個包含數據集中前n個元素的數組
  • takeSample(withReplacement,num, [seed])返回包含隨機的num個元素的數組
  • takeOrdered(n, [ordering])是返回包含隨機的n個元素的數組,按照順序輸出
  • saveAsTextFile把數據集中的元素寫到一個文本文件,Spark會對每個元素調用toString方法來把每個元素存成文本文件的一行。
  • countByKey對於(K, V)類型的RDD. 返回一個(K, Int)的map, Int為K的個數
  • foreach(func)是對數據集中的每個元素都執行func函數

5.持久化

Spark 中一個很重要的能力是將數據 persisting 持久化(或稱為 caching 緩存),在多個操作間都可以訪問這些持久化的數據。RDD 可以使用 persist() 方法或 cache() 方法進行持久化。

二、共享變量

通常情況下,一個傳遞給 Spark 操作(例如 map 或 reduce)的函數 func 是在遠程的集群節點上執行的。該函數 func 在多個節點執行過程中使用的變量,是同一個變量的多個副本。這些變量的以副本的方式拷貝到每個機器上,並且各個遠程機器上變量的更新並不會傳播回 driver program(驅動程序)。通用且支持 read-write(讀-寫)的共享變量在任務間是不能勝任的。所以,Spark 提供了兩種特定類型的共享變量:broadcast variables(廣播變量)accumulators(累加器)

1.廣播變量

Broadcast variables(廣播變量)允許程序員將一個 read-only(只讀的)變量緩存到每臺機器上,而不是給任務傳遞一個副本。廣播變量通過在一個變量 v 上調用 SparkContext.broadcast(v) 方法來進行創建。廣播變量是 v 的一個 wrapper(包裝器),可以通過調用 value 方法來訪問它的值。

在創建廣播變量之後,在集群上執行的所有的函數中,應該使用該廣播變量代替原來的 v 值,所以節點上的 v 最多分發一次。另外,對象 v 在廣播後不應該再被修改,以保證分發到所有的節點上的廣播變量具有同樣的值(例如,如果以後該變量會被運到一個新的節點)。

2.累加器

Accumulators(累加器)是一個僅可以執行 “added”(添加)的變量來通過一個關聯和交換操作,因此可以高效地執行支持並行。累加器可以用於實現 counter(計數,類似在 MapReduce 中那樣)或者 sums(求和)。原生 Spark 支持數值型的累加器,並且程序員可以添加新的支持類型。

可以通過調用 sc.longAccumulator() sc.doubleAccumulator() 方法創建數值類型的 accumulator(累加器)以分別累加 Long 或 Double 類型的值。集群上正在運行的任務就可以使用 add 方法來累計數值。然而,它們不能夠讀取它的值。只有 driver program(驅動程序)才可以使用 value 方法讀取累加器的值。

RDD和共享變量的內容至此結束,下文將進一步對Spark SQL 、DataFrame、DataSets的內容做詳細介紹。前文筆記請參考下面的鏈接:


分享到:


相關文章: