Spark Streaming 項目實戰 (4)

統計各廣告最近 1 小時內的點擊量趨勢:各廣告最近 1 小時內各分鐘的點擊量

此部分最終想要得到的結果如下:

Spark Streaming 項目實戰 (4)

一. 得到最近1小時廣告點擊量實時統計

  • 1. 新建類LastHourApp
<code>package com.buwenbuhuo.streaming.project.app
import com.buwenbuhuo.streaming.project.bean.AdsInfo
import org.apache.spark.streaming.{Minutes, Seconds}
import org.apache.spark.streaming.dstream.DStream

/*
 * @author 不溫卜火
 * @create 2020-08-17 11:19
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 */
object LastHourApp extends App {
  override def doSomething(adsInfoStream: DStream[AdsInfo]): Unit = {
    adsInfoStream
        // 1. 先把窗口分好
      .window(Minutes(60),Seconds(3))
        // 2. 按照廣告分鐘 進行聚合
      .map(info => (info.adsId,info.hmString) -> 1)
      .reduceByKey(_+_)
        // 3. 再按照廣告分組,把這個廣告下所有的分鐘記錄放在一起
      .map{
        case ((ads,hm),count) => (ads,(hm,count))
      }
      .groupByKey()
      .print(10000)
  }
}
/*
統計各廣告最近1小時內的點擊量趨勢:各廣告最近1小時內各分鐘的點擊量,每6秒統計一次
1. 各廣告,每分鐘            ->         按照(廣告,分鐘) 分組
2. 最近1小時,每6秒統計一次   ->          窗口: 窗口長度1小時  窗口的滑動步長 5s

----

1. 先把窗口分好
2. 按照廣告分鐘 進行聚合
3. 再按照廣告分組,把這個廣告下所有的分鐘記錄放在一起
4. 把結果寫在redis中

 *//<code>
  • 2. 運行結果
Spark Streaming 項目實戰 (4)

二. 寫入到redis中

  • 1. 源碼(添加)
<code>        // 4. 把結果寫在redis中
      .foreachRDD(rdd => {
        rdd.foreachPartition(it=>{
          if (it.nonEmpty){ // 只是判斷是否有下一個元素,指針不會跳過這個元素
            // 1. 先建立到redis連接
            val client: Jedis = RedisUtil.getClient
            // 2. 寫元素到redis
            // 2.1 一個一個的寫(昨天)
            // 2.2 批次寫入
            import org.json4s.JsonDSL._

            val key: String = "last:ads:hour:count"
            val map: Map[String, String] = it.toMap.map {
              case (adsId, it) => (adsId, JsonMethods.compact(JsonMethods.render(it)))

            }
            // scala集合轉換成java集合
            import scala.collection.JavaConversions._
            println(map)
            client.hmset(key,map)

            // 3. 關閉redis(用的是連接池,實際是把連接歸還給連接池)
            client.close()

          }

        })/<code>
  • 2. 運行結果
Spark Streaming 項目實戰 (4)

  • 3. 在redis中查看
Spark Streaming 項目實戰 (4)

三. 完整代碼

<code>package com.buwenbuhuo.streaming.project.app
import com.buwenbuhuo.streaming.project.bean.AdsInfo
import com.buwenbuhuo.streaming.project.util.RedisUtil
import org.apache.spark.streaming.{Minutes, Seconds}
import org.apache.spark.streaming.dstream.DStream
import org.json4s.jackson.JsonMethods
import redis.clients.jedis.Jedis

/*
 * @author 不溫卜火
 * @create 2020-08-17 11:19
 * MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 */
object LastHourApp extends App {
  override def doSomething(adsInfoStream: DStream[AdsInfo]): Unit = {
    adsInfoStream
        // 1. 先把窗口分好
      .window(Minutes(60),Seconds(3))
        // 2. 按照廣告分鐘 進行聚合
      .map(info => (info.adsId,info.hmString) -> 1)
      .reduceByKey(_+_)
        // 3. 再按照廣告分組,把這個廣告下所有的分鐘記錄放在一起
      .map{
        case ((ads,hm),count) => (ads,(hm,count))
      }
      .groupByKey()
        // 4. 把結果寫在redis中
      .foreachRDD(rdd => {
        rdd.foreachPartition(it=>{
          if (it.nonEmpty){ // 只是判斷是否有下一個元素,指針不會跳過這個元素
            // 1. 先建立到redis連接
            val client: Jedis = RedisUtil.getClient
            // 2. 寫元素到redis
            // 2.1 一個一個的寫(昨天)
            // 2.2 批次寫入
            import org.json4s.JsonDSL._

            val key: String = "last:ads:hour:count"
            val map: Map[String, String] = it.toMap.map {
              case (adsId, it) => (adsId, JsonMethods.compact(JsonMethods.render(it)))

            }
            // scala集合轉換成java集合
            import scala.collection.JavaConversions._
            println(map)
            client.hmset(key,map)

            // 3. 關閉redis(用的是連接池,實際是把連接歸還給連接池)
            client.close()

          }

        })
      })
  }
}

/*
統計各廣告最近1小時內的點擊量趨勢:各廣告最近1小時內各分鐘的點擊量,每6秒統計一次
1. 各廣告,每分鐘            ->         按照(廣告,分鐘) 分組
2. 最近1小時,每6秒統計一次   ->          窗口: 窗口長度1小時  窗口的滑動步長 5s

----

1. 先把窗口分好
2. 按照廣告分鐘 進行聚合
3. 再按照廣告分組,把這個廣告下所有的分鐘記錄放在一起
4. 把結果寫在redis中

----

寫到redis的時候的數據的類型
1.
  key                              value
  廣告id                           json字符串每分鐘的點擊量

2.
  key                              value
  "last:ads:hour:count"            hash
                                   field       value
                                   adsId       json字符串
                                   "1"         {"09:24":100,"09:25":110,...}

 *//<code> 


分享到:


相關文章: