Flink Broadcast State實戰案例:電商平臺用戶行為模式分析

Broadcast State是Flink 1.5引入的功能,本文將跟大家分享Broadcast State的潛在使用場景,並使用電商用戶行為分析的例子來演示Broadcast State的使用方法。關於Flink狀態的基本原理,Keyed State和Operator State的使用方法,可以參考我之前的文章: 。

本文代碼已上傳到github:https://github.com/luweizheng/flink-tutorials

Broadcast State使用場景

無論是分佈式批處理還是流處理,將部分數據同步到所有實例上是一個十分常見的需求。例如,我們需要依賴一個不斷變化的控制規則來處理主數據流的數據,主數據流數據量比較大,只能分散到多個算子實例上,控制規則數據相對比較小,可以分發到所有的算子實例上。Broadcast State與直接在時間窗口進行兩個數據流的Join的不同點在於,控制規則數據量較小,可以直接放到每個算子實例裡,這樣可以大大提高主數據流的處理速度。

Flink Broadcast State實戰案例:電商平臺用戶行為模式分析

我們繼續使用電商平臺用戶行為分析為例,不同類型的用戶往往有特定的行為模式,有些用戶購買慾望強烈,有些用戶反覆猶豫才下單,有些用戶頻繁爬取數據,有盜刷數據的嫌疑,電商平臺運營人員為了提升商品的購買轉化率,保證平臺的使用體驗,經常會進行一些用戶行為模式分析。基於這個場景,我們可以構建一個Flink作業,實時監控識別不同模式的用戶。為了避免每次更新規則模式後重啟部署,我們可以將規則模式作為一個數據流與用戶行為數據流connect在一起,並將規則模式以Broadcast State的形式廣播到每個算子實例上。

電商用戶行為識別案例

下面開始具體構建一個實例程序。第一步,我們定義一些必要的數據結構來描述這個業務場景,包括用戶行為和規則模式兩個數據結構。

<code>/**    * 用戶行為    * categoryId為商品類目ID    * behavior包括點擊(pv)、購買(buy)、加購物車(cart)、喜歡(fav)    * */case class UserBehavior(userId: Long,                        itemId: Long,                        categoryId: Int,                        behavior: String,                        timestamp: Long)/**    * 行為模式    * 整個模式簡化為兩個行為    * */case class BehaviorPattern(firstBehavior: String, secondBehavior: String)/<code>

然後我們在主邏輯中讀取兩個數據流:

<code>// 主數據流val userBehaviorStream: DataStream[UserBehavior] = ...// BehaviorPattern數據流val patternStream: DataStream[BehaviorPattern] = .../<code>

目前Broadcast State只支持使用Key-Value形式,需要使用MapStateDescriptor來描述。這裡我們使用一個比較簡單的行為模式,因此Key是一個空類型。當然我們也可以根據業務場景,構造複雜的Key-Value對。然後,我們將模式流使用broadcast方法廣播到所有算子子任務上。

<code>// Broadcast State只能使用 Key->Value 結構,基於MapStateDescriptorval broadcastStateDescriptor =new MapStateDescriptor[Void, BehaviorPattern]("behaviorPattern", classOf[Void], classOf[BehaviorPattern])val broadcastStream: BroadcastStream[BehaviorPattern] = patternStream.broadcast(broadcastStateDescriptor)/<code>

用戶行為模式流先按照用戶ID進行keyBy,然後與廣播流合併:

<code>// 生成一個KeyedStreamval keyedStream =  userBehaviorStream.keyBy(user => user.userId)// 在KeyedStream上進行connect和processval matchedStream = keyedStream  .connect(broadcastStream)  .process(new BroadcastPatternFunction)/<code>

BroadcastPatternFunction是KeyedBroadcastProcessFunction的具體實現,它基於Broadcast State處理主數據流,生成(Long, BehaviorPattern),分別表示用戶ID和命中的行為模式。下面的代碼展示了具體的使用方法。

<code>/**    * 四個泛型分別為:    * 1. KeyedStream中Key的數據類型    * 2. 主數據流的數據類型    * 3. 廣播流的數據類型    * 4. 輸出類型    * */class BroadcastPatternFunctionextends KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)] {  // 用戶上次性能狀態句柄,每個用戶存儲一個狀態  private var lastBehaviorState: ValueState[String] = _  // Broadcast State Descriptor  private var bcPatternDesc: MapStateDescriptor[Void, BehaviorPattern] = _  override def open(parameters: Configuration): Unit = {    lastBehaviorState = getRuntimeContext.getState(      new ValueStateDescriptor[String]("lastBehaviorState", classOf[String])    )    bcPatternDesc = new MapStateDescriptor[Void, BehaviorPattern]("behaviorPattern", classOf[Void], classOf[BehaviorPattern])  }  // 當BehaviorPattern流有新數據時,更新BroadcastState  override def processBroadcastElement(pattern: BehaviorPattern,                                       context: KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)]#Context,                                       collector: Collector[(Long, BehaviorPattern)]): Unit = {    val bcPatternState: BroadcastState[Void, BehaviorPattern] = context.getBroadcastState(bcPatternDesc)    // 將新數據更新至Broadcast State,這裡使用一個null作為Key    // 在本場景中所有數據都共享一個Pattern,因此這裡偽造了一個Key    bcPatternState.put(null, pattern)  }  override def processElement(userBehavior: UserBehavior,                              context: KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)]#ReadOnlyContext,                              collector: Collector[(Long, BehaviorPattern)]): Unit = {    // 獲取最新的Broadcast State    val pattern: BehaviorPattern = context.getBroadcastState(bcPatternDesc).get(null)    val lastBehavior: String = lastBehaviorState.value()    if (pattern != null && lastBehavior != null) {      // 用戶之前有過行為,檢查是否符合給定的模式      if (pattern.firstBehavior.equals(lastBehavior) &&          pattern.secondBehavior.equals(userBehavior.behavior))      // 當前用戶行為符合模式      collector.collect((userBehavior.userId, pattern))    }    lastBehaviorState.update(userBehavior.behavior)  }}/<code>

總結下來,使用Broadcast State需要進行下面三步:

  1. 接收一個普通數據流,並使用broadcast方法將其轉換為BroadcastStream,因為Broadcast State目前只支持Key-Value結構,需要使用MapStateDescriptor描述它的數據結構。
  2. 將BroadcastStream與一個DataStream或KeyedStream使用connect方法連接到一起。
  3. 實現一個ProcessFunction,如果主流是DataStream,則需要實現BroadcastProcessFunction;如果主流是KeyedStream,則需要實現KeyedBroadcastProcessFunction。這兩種函數都提供了時間和狀態的訪問方法。

在KeyedBroadcastProcessFunction個函數類中,有兩個函數需要實現:

  • processElement:處理主數據流(非Broadcast流)中的每條元素,輸出零到多個數據。ReadOnlyContext 可以獲取時間和狀態,但是隻能以只讀的形式讀取Broadcast State,不能修改,以保證每個算子實例上的Broadcast State都是相同的。
  • processBroadcastElement:處理流入的廣播流,可以輸出零到多個數據,一般用來更新Broadcast State。

此外,在KeyedBroadcastProcessFunction中可以註冊Timer,並在onTimer方法中實現回調邏輯。本例中為了保持代碼簡潔,沒有使用,一般可以用來清空狀態,避免狀態無限增長下去。

小結

本文解釋了Broadcast State的原理和使用場景,並以電商平臺用戶行為分析為例演示了具體的使用方法。


分享到:


相關文章: