Spark Streaming 項目實戰 (4)

統計各廣告最近 1 小時內的點擊量趨勢:各廣告最近 1 小時內各分鐘的點擊量

此部分最終想要得到的結果如下:

一. 得到最近1小時廣告點擊量實時統計

1. 新建類LastHourApp

<code>package com.buwenbuhuo.streaming.project.app import com.buwenbuhuo.streaming.project.bean.AdsInfo import org.apache.spark.streaming.{Minutes, Seconds} import org.apache.spark.streaming.dstream.DStream /* * @author 不溫卜火 * @create 2020-08-17 11:19 * MyCSDN : https://buwenbuhuo.blog.csdn.net/ */ object LastHourApp extends App { override def doSomething(adsInfoStream: DStream[AdsInfo]): Unit = { adsInfoStream // 1. 先把窗口分好 .window(Minutes(60),Seconds(3)) // 2. 按照廣告分鐘 進行聚合 .map(info => (info.adsId,info.hmString) -> 1) .reduceByKey(_+_) // 3. 再按照廣告分組,把這個廣告下所有的分鐘記錄放在一起 .map{ case ((ads,hm),count) => (ads,(hm,count)) } .groupByKey() .print(10000) } } /* 統計各廣告最近1小時內的點擊量趨勢:各廣告最近1小時內各分鐘的點擊量,每6秒統計一次 1. 各廣告,每分鐘 -> 按照(廣告,分鐘) 分組 2. 最近1小時,每6秒統計一次 -> 窗口: 窗口長度1小時 窗口的滑動步長 5s ---- 1. 先把窗口分好 2. 按照廣告分鐘 進行聚合 3. 再按照廣告分組,把這個廣告下所有的分鐘記錄放在一起 4. 把結果寫在redis中 *//<code> 2. 運行結果

二. 寫入到redis中

1. 源碼(添加)

<code> // 4. 把結果寫在redis中 .foreachRDD(rdd => { rdd.foreachPartition(it=>{ if (it.nonEmpty){ // 只是判斷是否有下一個元素,指針不會跳過這個元素 // 1. 先建立到redis連接 val client: Jedis = RedisUtil.getClient // 2. 寫元素到redis // 2.1 一個一個的寫(昨天) // 2.2 批次寫入 import org.json4s.JsonDSL._ val key: String = "last:ads:hour:count" val map: Map[String, String] = it.toMap.map { case (adsId, it) => (adsId, JsonMethods.compact(JsonMethods.render(it))) } // scala集合轉換成java集合 import scala.collection.JavaConversions._ println(map) client.hmset(key,map) // 3. 關閉redis(用的是連接池,實際是把連接歸還給連接池) client.close() } })/<code> 2. 運行結果

3. 在redis中查看

三. 完整代碼

<code>package com.buwenbuhuo.streaming.project.app import com.buwenbuhuo.streaming.project.bean.AdsInfo import com.buwenbuhuo.streaming.project.util.RedisUtil import org.apache.spark.streaming.{Minutes, Seconds} import org.apache.spark.streaming.dstream.DStream import org.json4s.jackson.JsonMethods import redis.clients.jedis.Jedis /* * @author 不溫卜火 * @create 2020-08-17 11:19 * MyCSDN : https://buwenbuhuo.blog.csdn.net/ */ object LastHourApp extends App { override def doSomething(adsInfoStream: DStream[AdsInfo]): Unit = { adsInfoStream // 1. 先把窗口分好 .window(Minutes(60),Seconds(3)) // 2. 按照廣告分鐘 進行聚合 .map(info => (info.adsId,info.hmString) -> 1) .reduceByKey(_+_) // 3. 再按照廣告分組,把這個廣告下所有的分鐘記錄放在一起 .map{ case ((ads,hm),count) => (ads,(hm,count)) } .groupByKey() // 4. 把結果寫在redis中 .foreachRDD(rdd => { rdd.foreachPartition(it=>{ if (it.nonEmpty){ // 只是判斷是否有下一個元素,指針不會跳過這個元素 // 1. 先建立到redis連接 val client: Jedis = RedisUtil.getClient // 2. 寫元素到redis // 2.1 一個一個的寫(昨天) // 2.2 批次寫入 import org.json4s.JsonDSL._ val key: String = "last:ads:hour:count" val map: Map[String, String] = it.toMap.map { case (adsId, it) => (adsId, JsonMethods.compact(JsonMethods.render(it))) } // scala集合轉換成java集合 import scala.collection.JavaConversions._ println(map) client.hmset(key,map) // 3. 關閉redis(用的是連接池,實際是把連接歸還給連接池) client.close() } }) }) } } /* 統計各廣告最近1小時內的點擊量趨勢:各廣告最近1小時內各分鐘的點擊量,每6秒統計一次 1. 各廣告,每分鐘 -> 按照(廣告,分鐘) 分組 2. 最近1小時,每6秒統計一次 -> 窗口: 窗口長度1小時 窗口的滑動步長 5s ---- 1. 先把窗口分好 2. 按照廣告分鐘 進行聚合 3. 再按照廣告分組,把這個廣告下所有的分鐘記錄放在一起 4. 把結果寫在redis中 ---- 寫到redis的時候的數據的類型 1. key value 廣告id json字符串每分鐘的點擊量 2. key value "last:ads:hour:count" hash field value adsId json字符串 "1" {"09:24":100,"09:25":110,...} *//<code>