【導讀:數據是二十一世紀的石油,蘊含巨大價值,這是·情報通·大數據技術系列第[82]篇文章,歡迎閱讀和收藏】
1 背景介紹
SparkSQL 引入了一種新的 RDD —— SchemaRDD , SchemaRDD 由行對象( Row )以及描述行對象中每列數據類型的 Schema 組成; SchemaRDD 很像傳統數據庫中的表。 SchemaRDD 可以通過 RDD 、 Parquet 文件、 JSON 文件、或者通過使用 hiveql 查詢 hive 數據來建立。 SchemaRDD 除了可以和 RDD 一樣操作外,還可以通過 registerTempTable 註冊成臨時表,然後通過 SQL 語句進行操作。
值得注意的是:
l Spark1.1 使用 registerTempTable 代替 1.0 版本的 registerAsTable
l Spark1.1 在 hiveContext 中, hql() 將被棄用, sql() 將代替 hql() 來提交查詢語句,統一了接口。
l 使用 registerTempTable 註冊表是一個臨時表,生命週期只在所定義的 sqlContext 或 hiveContext 實例之中。換而言之,在一個 sqlContext (或 hiveContext )中 registerTempTable 的表不能在另一個 sqlContext (或 hiveContext )中使用。
l Spark1.1 提供了語法解析器選項 spark.sql.dialect ,就目前而言, Spark1.1 提供了兩種語法解析器: sql 語法解析器和 hiveql 語法解析器。
l sqlContext 現在只支持 sql 語法解析器( SQL-92 語法)
l hiveContext 現在支持 sql 語法解析器和 hivesql 語法解析器,默認為 hivesql 語法解析器,用戶可以通過配置切換成 sql 語法解析器,來運行 hiveql 不支持的語法,如 select 1 。
切換可以通過下列方式完成:
l 在 sqlContexet 中使用 setconf 配置 spark.sql.dialect
l 在 hiveContexet 中使用 setconf 配置 spark.sql.dialect
l 在 sql 命令中使用 set spark.sql.dialect=value
SparkSQL1.1 對數據的查詢分成了 2 個分支: sqlContext 和 hiveContext 。至於兩者之間的關係, hiveSQL 繼承了 sqlContext ,所以擁有 sqlontext 的特性之外,還擁有自身的特性(最大的特性就是支持 hive )。
2 sqlContext
2.1 使用 Case Class 定義 RDD
對於 Case Class 方式,首先要定義 Case Class ,在 RDD 的 Transform 過程中使用 Case Class 可以隱式轉化成 SchemaRDD ,然後再使用 registerTempTable 註冊成表。註冊成表後就可以在 sqlContext 對錶進行操作,如 select 、 insert 、 join 等。注意, case class 可以是嵌套的,也可以使用類似 Sequences 或 Arrays 之類複雜的數據類型。
應用案例:
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> import sqlContext.implicits._
scala> case class Person(name:String,age:Int)
scala> val people=sc.textFile("hdfs://cloud25:9000/data/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()
scala> teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
2.2 Parquest 應用案例
sqlContext 可以讀取 parquet 文件,由於 parquet 文件中保留了 schema 的信息,所以不需要使用 case class 來隱式轉換。 sqlContext 讀入 parquet 文件後直接轉換成 SchemaRDD ,也可以將 SchemaRDD 保存成 parquet 文件格式。
應用案例:
scala> import sqlContext.implicits._
scala> sqlContext.setConf("spark.sql.parquet.binaryAsString", "true") // 解決文件中 parquet 中 binary 字段的問題
scala> val wikiData = sqlContext.parquetFile("hdfs://cloud25:9000/data/wiki_parquet").toDF()
scala> wikiData.count()
scala> wikiData.registerTempTable("wikidata")
scala> val countResult = sqlContext.sql("SELECT COUNT(*) FROM wikiData").collect()
scala> val queryResult= sqlContext.sql("SELECT username, COUNT(*) AS cnt FROM wikiData WHERE username <> '' GROUP BY username ORDER BY cnt DESC LIMIT 10")
scala> queryResult.collect().foreach(println)
2.3 Join 應用案例
sqlContext 可以從多個種類的 SchemaRDD 中執行 join 操作
應用案例:
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> import sqlContext.implicits._
scala> case class Person(name:String,age:Int)
scala> val people=sc.textFile("hdfs://cloud25:9000/data/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()
scala> people.saveAsParquetFile("hdfs://cloud25:9000/data/people.parquet")
scala> val parquetFile = sqlContext.parquetFile("hdfs://cloud25:9000/data/people.parquet")
scala> people.registerTempTable("people")
scala> parquetFile.registerTempTable("parquetFile")
scala> val jointbls = sqlContext.sql("SELECT people.name FROM people join parquetFile where people.name=parquetFile.name")
scala> jointbls.collect.foreach(println)
3 hiveContext
SparkSQL 1.4.1 版本,可以動過兩種方式與 hive 集成,第一種是使用已經部署好的 metastore ,如果使用這種方式需要在 spark 中添加 hive 的相關配置,第二種 : 如果沒有配置相關的信息, hiveContext 會在當前目錄自己創建 metadata_db 以及 warehouse
配置過程
以下配置流程僅僅針對描述中的第一種情況(使用已有 metastore )
l 後臺啟動 metasore 服務
l 創建 /opt/cloud/spark-1.4.1-bin-hadoop2.6/conf/hive-site.xml 文件,並編輯內容如下:
<configuration>
<property>
<name>hive.metastore.uris/<name>
<value>thrift://cloud25:9083/<value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore./<description>
啟動 spark-shell:
./spark-shell --master=spark://cloud25:7077 --executor-memory=2g
應用案例:
l 使用已有 metastore 的代碼
scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala> hiveContext.sql("select count(*) from SOGOUQ1 where S_SEQ=1 and C_SEQ=2").collect.foreach(println)
scala> hiveContext.sql("select count(*) from SOGOUQ1 where S_SEQ=1 and C_SEQ=2 and WEBSITE like '%baidu%'").collect.foreach(println)
scala> hiveContext.sql("select WEBSESSION,count(WEBSESSION) as cw from SOGOUQ1 group by WEBSESSION order by cw desc limit 10").collect.foreach(println)
l 獨立維護 metastore 的代碼
scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala> hiveContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
閱讀更多 情報通 的文章