一、测试总结
1.spark-sql可以读取多层级parquet格式的数据,使用字段名1.字段名2....字段名N即可
2.读取的最后一层数据的类型如果是基本数据类型,且之前的每层的数组都不为数组的话,读取出来的数据可以直接转换成对应的数据类型使用,如id.tdid
3.读取的最后一层的类型如果是基本数据类型但之前的每层的数据中有数组类型的话,需要根据对应的schema解析
如id.sim.imsi得到的是imsi的一个数组,想要获取imsi的值,需要如下操作:
sqlContext.read.parquet(in).select("id.sim.imsi").map { row =>
row.getAs[Seq[String]](row.fieldIndex("imsi"))
}
4.[很重要]利用hive表中的LATERAL VIEW同时读取多个复合结构.读取这种结构有很多种方式,这种是最简单的一种。注意哦,以后很可能会用到。
二、测试sql及结果
1.读取二级数据且一级为数组,二级数据类型为基本类型
如:location.lat
测试sql及结果:
sqlContext.read.parquet(in).select("location.lat").show()
+---+
|lat|
+---+
| []|
| []|
+---+
2.读取二级数据且一级不为数组,数据类型为基本类型
如id.tdid,tdid为string类型
测试sql及结果:
sqlContext.read.parquet(in).select("id.tdid").show()
+--------------------+
| tdid|
+--------------------+
|33c8ff7c56bea602a...|
|315e2b810fe961091...|
+--------------------+
3.读取二级数据且一级不为数组,数据类型为数组
如id.sim,sim为sim对象的一个数组
测试sql及结果:
sqlContext.read.parquet(in).select("id.sim").show()
+--------------------+
| sim|
+--------------------+|
[[null,null,null,...|
|[[null,null,null,...|
+--------------------+
4.读取三级数据且二级不为数组,三级数据类型为基本类型
如device.cpu.name
测试sql及结果:
sqlContext.read.parquet(in).select("device.cpu.name").show()
+--------------------+
| name|
+--------------------+
|AArch64 Processor...|
|ARMv7 Processor r...|
+--------------------+
5.读取三级数据且二级不为数组,三级数据类型为数组
如networks.ip.proxyIp
测试sql及结果:
sqlContext.read.parquet(in).select("networks.ip.proxyIp").show()
+-------+
|proxyIp|
+-------+
| null|
| null|
+-------+
6.读取三级数据且二级为数组,三级数据类型为基本类型
如id.sim.imei
测试sql及结果:
sqlContext.read.parquet(in).select("id.sim.imei").show()
+------+
| imsi|
+------+
|[null]|
|[null]|
|[null]|
+------+
7.处理Array
利用hive表中的LATERAL VIEW处理
scala> val ass = sqlContext.read.parquet("/data/xx/2018-01")
scala> ass.printSchema
root
|-- tdid: string (nullable = true)
|-- offset: long (nullable = true)
|-- assemblyPoint: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- dayType: integer (nullable = true)
| | |-- hour: integer (nullable = true)
| | |-- geoHash: string (nullable = true)
| | |-- source: string (nullable = true)
| | |-- accuracy: double (nullable = true)
| | |-- pointCnt: integer (nullable = true)
scala> ass.registerTempTable("tb_ass")
scala> val a = sql("SELECT offset, ap FROM tb_ass LATERAL VIEW explode(assemblyPoint) adTable AS ap")
scala> a.select("offset","ap.dayType","ap.hour").show(10,false)
+----------+-------+----+
|offset |dayType|hour|
+----------+-------+----+
|6334845336|101 |15 |
|6334845336|119 |21 |
|6334845336|101 |18 |
|6334845336|119 |19 |
+----------+-------+----+
only showing top 10 rows
閱讀更多 從大數據說起 的文章