Spark大數據分佈式處理實戰:一文帶你走進大數據世界

前言

Spark是一種大規模、快速計算的集群平臺,本公眾號試圖通過學習Spark官網的實戰演練筆記提升筆者實操能力以及展現Spark的精彩之處。有關框架介紹和環境配置可以參考以下內容:

本文的參考配置為:Deepin 15.11、Java 1.8.0_241、Hadoop 2.10.0、Spark 2.4.4、scala 2.11.12

一、開啟服務

首先開啟Hadoop服務(包括HDFS和yarn服務),再開啟Spark的主節點及從節點服務。

Spark大數據分佈式處理實戰:一文帶你走進大數據世界

Spark大數據分佈式處理實戰:一文帶你走進大數據世界

再通過spark-shell命令啟用spark命令行模式:

Spark大數據分佈式處理實戰:一文帶你走進大數據世界

之後就可以通過鏈接進入spark的web界面:

Spark大數據分佈式處理實戰:一文帶你走進大數據世界

二、Spark基礎操作

Spark 的主要抽象是一個稱為 Dataset 的分佈式的 item 集合。Datasets 可以從 Hadoop 的 InputFormats(例如 HDFS文件)或者通過其它的 Datasets 轉換來創建。我們先複製一段英文文本,並保存為test文件:

<code>1.While there is life there is hope.
2.I am a slow walker,but I never walk backwards.
3.Never underestimate your power to change yourself!
4.Nothing is impossible!
5.Nothing for nothing.
6.The man who has made up his mind to win will never say "impossible".
7.I will greet this day with love in my heart.
8.Do what you say,say what you do
9.I can make it through the rain. I can stand up once again on my own.
10.All things come to those who wait./<code>

通過spark.read.textFile方法讀取該文件並轉換為Datesets。我們可以直接從 Dataset 中獲取 values(值),通過調用一些 actions(動作),或者 transform(轉換)Dataset 以獲得一個新的。更多細節,請參閱Spark API。

<code>scala> val textFile = spark.read.textFile("file:///home/phenix/Documents/spark/data/test")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]​

scala> textFile.count()
res0: Long = 10​

scala> textFile.first()
res1: String = 1.While there is life there is hope./<code>

現在讓我們 transform 這個 Dataset 以獲得一個新的 。我們調用 filter 以返回一個新的 Dataset,它是文件中的 items 的一個子集。同樣我們可以鏈式操作 transformation(轉換)和 action(動作):

<code>scala> val lineswithnever = textFile.filter(line => line.contains("never"))
lineswithnever: org.apache.spark.sql.Dataset[String] = [value: string]

scala> lineswithnever
res4: org.apache.spark.sql.Dataset[String] = [value: string]
scala> lineswithnever.collect()

res5: Array[String] = Array(2.I am a slow walker,but I never walk backwards., 6.The man who has made up his mind to win will never say "impossible".)// 鏈式操作

scala> lineswithnever.count()
res6: Long = 2/<code>

三、Dataset上的更多操作

Dataset actions(操作)transformations(轉換)可以用於更復雜的計算。例如,統計出現單詞最多的行,結果為16。通過查看原始文件可知是第九行單詞數最多為16個。

<code>scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res0: Int = 16/<code>

第一個 map 操作創建一個新的 Dataset,將一行數據 map 為一個整型值。在 Dataset 上調用 reduce 來找到最大的行計數。參數 map 與 reduce 是 Scala 函數(closures),並且可以使用 Scala/Java 庫的任何語言特性。

一種常見的數據流模式是被 Hadoop 所推廣的 MapReduce。Spark 可以很容易實現 MapReduce:

<code>scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count() 

wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint

scala> wordCounts.collect()
res7: Array[(String, Long)] = Array((those,1), (slow,1), (rain.,1), (day,1), (7.I,1), (own.,1),.../<code>

在這裡,我們調用了 flatMap 以 transform 一個 lines 的 Dataset 為一個 words 的 Dataset,然後結合 groupByKey 和 count 來計算文件中每個單詞的 counts 作為一個 (String, Long) 的 Dataset pairs。要在 shell 中,我們可以調用 collect來收集 word counts。

四、緩存

Spark 還支持 Pulling(拉取)數據集到一個群集範圍的內存緩存中。例如當查詢一個小的 “hot” 數據集或運行一個像 PageRANK 這樣的迭代算法時,在數據被重複訪問時是非常高效的。舉一個簡單的例子,讓我們標記我們的 linesWithSpark 數據集到緩存中:

<code>scala> lineswithnever.cache()
res2: lineswithnever.type = [value: string]

scala> lineswithnever.count()
res3: Long = 2

scala> lineswithnever.count()
res4: Long = 2/<code>

五、應用程序

我們還可以使用 Spark API 來創建一個獨立的應用程序。我們將在 Scala 中創建一個非常簡單的 Spark 應用程序(應用程序打包支持Scala(

SBT),Java(Maven),Python).首先我們創建SimpleApp.scala文件,該程序僅僅統計了之前文本文件中每一行包含 ‘a’ 的數量和包含 ‘b’ 的數量。

<code>/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]) {
val logFile = "file:///home/phenix/Documents/spark/data/test" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}/<code>

通過:load 命令加載scala文件並執行

<code>scala> :load /home/phenix/Documents/spark/code/SimpleApp.scala
Loading /home/phenix/Documents/spark/code/SimpleApp.scala...
import org.apache.spark.sql.SparkSession
defined object SimpleApp

scala> SimpleApp.main(Array("test"))
20/02/21 13:28:28 WARN sql.SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.Lines with a: 7, Lines with b: 3                                                /<code>

我們調用 SparkSession.builder 以構造一個 [[SparkSession]],然後設置 application name(應用名稱),最終調用 getOrCreate 以獲得 [[

SparkSession]] 實例。

至此我們已經完成Spark的快速開始環節,下文將進一步講解RDD、累加器及廣播變量的概念及使用。


分享到:


相關文章: