「大數據」(八十二)Spark之SparkSQL應用案例

【導讀:數據是二十一世紀的石油,蘊含巨大價值,這是·情報通·大數據技術系列第[82]篇文章,歡迎閱讀和收藏】

1 背景介紹

SparkSQL 引入了一種新的 RDD —— SchemaRDD , SchemaRDD 由行對象( Row )以及描述行對象中每列數據類型的 Schema 組成; SchemaRDD 很像傳統數據庫中的表。 SchemaRDD 可以通過 RDD 、 Parquet 文件、 JSON 文件、或者通過使用 hiveql 查詢 hive 數據來建立。 SchemaRDD 除了可以和 RDD 一樣操作外,還可以通過 registerTempTable 註冊成臨時表,然後通過 SQL 語句進行操作。

「大數據」(八十二)Spark之SparkSQL應用案例

值得注意的是:

l Spark1.1 使用 registerTempTable 代替 1.0 版本的 registerAsTable

l Spark1.1 在 hiveContext 中, hql() 將被棄用, sql() 將代替 hql() 來提交查詢語句,統一了接口。

l 使用 registerTempTable 註冊表是一個臨時表,生命週期只在所定義的 sqlContext 或 hiveContext 實例之中。換而言之,在一個 sqlContext (或 hiveContext )中 registerTempTable 的表不能在另一個 sqlContext (或 hiveContext )中使用。

l Spark1.1 提供了語法解析器選項 spark.sql.dialect ,就目前而言, Spark1.1 提供了兩種語法解析器: sql 語法解析器和 hiveql 語法解析器。

l sqlContext 現在只支持 sql 語法解析器( SQL-92 語法)

l hiveContext 現在支持 sql 語法解析器和 hivesql 語法解析器,默認為 hivesql 語法解析器,用戶可以通過配置切換成 sql 語法解析器,來運行 hiveql 不支持的語法,如 select 1 。

切換可以通過下列方式完成:

l 在 sqlContexet 中使用 setconf 配置 spark.sql.dialect

l 在 hiveContexet 中使用 setconf 配置 spark.sql.dialect

l 在 sql 命令中使用 set spark.sql.dialect=value

SparkSQL1.1 對數據的查詢分成了 2 個分支: sqlContext 和 hiveContext 。至於兩者之間的關係, hiveSQL 繼承了 sqlContext ,所以擁有 sqlontext 的特性之外,還擁有自身的特性(最大的特性就是支持 hive )。

2 sqlContext

2.1 使用 Case Class 定義 RDD

對於 Case Class 方式,首先要定義 Case Class ,在 RDD 的 Transform 過程中使用 Case Class 可以隱式轉化成 SchemaRDD ,然後再使用 registerTempTable 註冊成表。註冊成表後就可以在 sqlContext 對錶進行操作,如 select 、 insert 、 join 等。注意, case class 可以是嵌套的,也可以使用類似 Sequences 或 Arrays 之類複雜的數據類型。

應用案例:

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

scala> import sqlContext.implicits._

scala> case class Person(name:String,age:Int)

scala> val people=sc.textFile("hdfs://cloud25:9000/data/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()

scala> teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

2.2 Parquest 應用案例

sqlContext 可以讀取 parquet 文件,由於 parquet 文件中保留了 schema 的信息,所以不需要使用 case class 來隱式轉換。 sqlContext 讀入 parquet 文件後直接轉換成 SchemaRDD ,也可以將 SchemaRDD 保存成 parquet 文件格式。

應用案例:

scala> import sqlContext.implicits._

scala> sqlContext.setConf("spark.sql.parquet.binaryAsString", "true") // 解決文件中 parquet 中 binary 字段的問題

scala> val wikiData = sqlContext.parquetFile("hdfs://cloud25:9000/data/wiki_parquet").toDF()

scala> wikiData.count()

scala> wikiData.registerTempTable("wikidata")

scala> val countResult = sqlContext.sql("SELECT COUNT(*) FROM wikiData").collect()

scala> val queryResult= sqlContext.sql("SELECT username, COUNT(*) AS cnt FROM wikiData WHERE username <> '' GROUP BY username ORDER BY cnt DESC LIMIT 10")

scala> queryResult.collect().foreach(println)

2.3 Join 應用案例

sqlContext 可以從多個種類的 SchemaRDD 中執行 join 操作

應用案例:

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

scala> import sqlContext.implicits._

scala> case class Person(name:String,age:Int)

scala> val people=sc.textFile("hdfs://cloud25:9000/data/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()

scala> people.saveAsParquetFile("hdfs://cloud25:9000/data/people.parquet")

scala> val parquetFile = sqlContext.parquetFile("hdfs://cloud25:9000/data/people.parquet")

scala> people.registerTempTable("people")

scala> parquetFile.registerTempTable("parquetFile")

scala> val jointbls = sqlContext.sql("SELECT people.name FROM people join parquetFile where people.name=parquetFile.name")

scala> jointbls.collect.foreach(println)

3 hiveContext

SparkSQL 1.4.1 版本,可以動過兩種方式與 hive 集成,第一種是使用已經部署好的 metastore ,如果使用這種方式需要在 spark 中添加 hive 的相關配置,第二種 : 如果沒有配置相關的信息, hiveContext 會在當前目錄自己創建 metadata_db 以及 warehouse

配置過程

以下配置流程僅僅針對描述中的第一種情況(使用已有 metastore )

l 後臺啟動 metasore 服務

l 創建 /opt/cloud/spark-1.4.1-bin-hadoop2.6/conf/hive-site.xml 文件,並編輯內容如下:

<configuration>

<property>

<name>hive.metastore.uris/<name>

<value>thrift://cloud25:9083/<value>

<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore./<description>

啟動 spark-shell:

./spark-shell --master=spark://cloud25:7077 --executor-memory=2g


應用案例:

l 使用已有 metastore 的代碼

scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

scala> hiveContext.sql("select count(*) from SOGOUQ1 where S_SEQ=1 and C_SEQ=2").collect.foreach(println)

scala> hiveContext.sql("select count(*) from SOGOUQ1 where S_SEQ=1 and C_SEQ=2 and WEBSITE like '%baidu%'").collect.foreach(println)

scala> hiveContext.sql("select WEBSESSION,count(WEBSESSION) as cw from SOGOUQ1 group by WEBSESSION order by cw desc limit 10").collect.foreach(println)

l 獨立維護 metastore 的代碼

scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

scala> hiveContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")


分享到:


相關文章: