Spark SQL與DataFrame詳解以及使用

Spark SQL是Spark的一個結構化數據處理模塊,提供一個DataFrame編程模型抽象,可以看做是一個分佈式SQL查詢引擎。Spark SQL主要由Catalyst優化,Spark SQL內核,Hive支持三部分組成。

Catalyst優化處理查詢語句的整個過程,包括解析,綁定,優化,物理計劃等,主要由關係代數,表達式以及查詢優化組成。

Spark SQL內核處理數據的輸入輸出,從不同的數據源(結構化Parquet文件和JSON文件,Hive表,外部數據庫,創建RDD)獲取數據,執行查詢,並將結果輸出成DataFrame。

Hive支持是值對Hive數據的處理,主要包括HiveSQL,MetaStore,SerDes,UDFS等。

Spark SQL架構

Spark SQL對SQL語句的處理和關係型數據庫SQL處理類似,將SQL語句解析成一顆數(Tree),然後通過規則(rule)的模式匹配,對樹進行綁定,優化等,然後得到查詢結果。

Tree的具體操作是通過TreeNode實現的;Rule是一個抽象類,是通過RuleExecutor完成的,應用與Spark SQL的Analyzer,Optimizer,Spark Planner等組件中,可以簡便,模塊化地對Tree進行Transform操作。

在整個SQL語句執行的過程中,主要依賴了優化框架Catalyst,把SQL語句解析,綁定,優化等,最終將邏輯計劃優化後並且轉換為物理執行計劃,最後變成DataFrame模型。

Spark SQL的整個架構圖如下所示:

Spark SQL與DataFrame詳解以及使用

  1. 使用SqlParser對SQL語句進行解析,生成Unresolved邏輯計劃(沒有提取schema信息)。
  2. 使用Catalyst分析器,結合數據字典(catalog)進行綁定,生成Analyzed邏輯計劃,在此過程中,Schema Catalog則要提取schema信息。
  3. 使用Catalyst優化器對Analyzed邏輯計劃進行優化,按照優化規則得到Optimized邏輯計劃。
  4. 接著和Spark Planner交互,使用相應的策略將邏輯計劃轉換為物理計劃,然後調用next函數,生成可執行物理計劃。
  5. 調用toDF,最後生成DataFrame。

Spark SQL有以下幾點特徵:

1.兼容多種數據格式,如上面所說的parquet文件,HIve表,JSON文件等等。

2.方便擴展,它的優化器,解析器都可以重新定義。

3.性能優化方面:採用了內存列式存儲,動態字節碼生成等技術,還採用了內存緩存數據。

4.支持多種語言操作,包括JAVA,SCALA,PYTHON,R語言等。

Spark SQL性能

1.內存列式存儲

Spark SQL內部使用內存列式模式緩存表,僅掃描需要的列,並且自動調整壓縮比使內存使用率和GC壓力最小化,如果緩存了數據,則下次執行時不需要重複讀取數據。

2.動態代碼生成和字節碼生成技術

對於一個簡單的查詢,如:

select a+b from table1

這個查詢,在傳統的方式中,會將SQL語句生成一個表達樹,然後調用虛函數確認Add兩邊的數據類型,然後再調用虛函數計算裝箱最後返回結果,計算時需要多次涉及虛函數的使用,打斷了CPU的利用,減緩了執行效率。

因此,使用字節碼技術會將表達式使用特定的代碼動態 編譯,為每一個查詢生成自定義字節碼,然後運行。最後需要說明的是DataFrame也是惰性的,在遇見Action操作的時候才會真正的去執行。

創建DataFrame

創建一個json格式文件,內容如下所示;

{"name":"Mirckel"}{"name":"Andy","age":30}{"name":"Jsutin","age":13}123

創建以及相關操作如下代碼所示:

 val sqlContext=new sql.SQLContext(sc) val df=sqlContext.jsonFile("file:///F:/spark-2.0.0/SparkApp/src/cn/just/shinelon/txt/SparkSql01.json") println(df.show()) //打印表數據 println(df.printSchema()) //以樹的形式打印DataFrame的Schema println(df.select(df("name"),df("age")+1).show())12345678

下面有兩種方式來操作RDD數據源,即將現有的RDD轉化為DataFrame。

操作的文件內容格式如下:

shinelon,19mike,20wangwu,25123

1.以反射機制推斷RDD模式

主要分下面三個步驟:

  1. 必須創建case類,只有case類才能隱式轉換為DataFrame。
  2. 必須生成DataFrame,進行註冊臨時表操作。
  3. 必須在內存中register成臨時表,才能供查詢使用。
 val conf=new SparkConf() .setMaster("local") .setAppName("SparkSqlDemo01") val sc=new SparkContext(conf) val sqlContext=new sql.SQLContext(sc) //使用case定義Schema(不能超過22個屬性),實現Person接口 //只有case類才能隱式轉換為一個DataFrame case class Person(name:String,age:Int) //使用前綴hdfs://來標識HDFS存儲系統的文件 val people=sc.textFile("file:///F:/spark-2.0.0/SparkApp/src/cn/just/shinelon/txt/SparkSql02.txt") .map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF //DataFrame註冊臨時表 people.refisterTempTable("person") //使用sql運行SQL表達式 val result=sqlContext.sql("SELECT name,age from person WHERE age>=19") println(result.map(t=>"Name:"+t(0)).collect()) println(result.map(t=>"Name:"+t.getAs[String](1)).collect())12345678910111213141516171819202122

2.以編程方式定義RDD模型

主要分三個步驟:

  1. 從原始RDD中創建一個Rows的RDD。
  2. 創建一個表示為StructType類型的Schema,匹配在第一步創建的RDD的Rows的結構。
  3. 通過SQLContext提供的createDataFrame方法,應用Schema到Rows的RDD。

代碼如下所示:

 val conf=new SparkConf() .setMaster("local") .setAppName("SparkSqlDemo01") val sc=new SparkContext(conf) val sqlContext=new sql.SQLContext(sc)val sqlContext=new sql.SQLContext(sc) //使用case定義Schema(不能超過22個屬性),實現Person接口 //只有case類才能隱式轉換為一個DataFrame case class Person(name:String,age:Int) //使用前綴hdfs://來標識HDFS存儲系統的文件 val people=sc.textFile("file:///F:/spark-2.0.0/SparkApp/src/cn/just/shinelon/txt/SparkSql02.txt") .map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF //DataFrame註冊臨時表 people.refisterTempTable("person") //使用sql運行SQL表達式 val result=sqlContext.sql("SELECT name,age from person WHERE age>=19") println(result.map(t=>"Name:"+t(0)).collect()) println(result.map(t=>"Name:"+t.getAs[String](1)).collect())12345678910111213141516171819202122

至此,就介紹了Spark SQL的基本原理以及初步使用,Spark SQL憑藉著內存的快速訪問快速在大數據界發展,有人說它取代了HIVE,這種說法過於狹隘,Hive還是應用廣泛,只能說代替了Hive的 部分功能,並不能完全代替Hive來使用。


分享到:


相關文章: