Broadcast State是Flink 1.5引入的功能,本文將跟大家分享Broadcast State的潛在使用場景,並使用電商用戶行為分析的例子來演示Broadcast State的使用方法。關於Flink狀態的基本原理,Keyed State和Operator State的使用方法,可以參考我之前的文章: 。
本文代碼已上傳到github:https://github.com/luweizheng/flink-tutorials
Broadcast State使用場景
無論是分佈式批處理還是流處理,將部分數據同步到所有實例上是一個十分常見的需求。例如,我們需要依賴一個不斷變化的控制規則來處理主數據流的數據,主數據流數據量比較大,只能分散到多個算子實例上,控制規則數據相對比較小,可以分發到所有的算子實例上。Broadcast State與直接在時間窗口進行兩個數據流的Join的不同點在於,控制規則數據量較小,可以直接放到每個算子實例裡,這樣可以大大提高主數據流的處理速度。
我們繼續使用電商平臺用戶行為分析為例,不同類型的用戶往往有特定的行為模式,有些用戶購買慾望強烈,有些用戶反覆猶豫才下單,有些用戶頻繁爬取數據,有盜刷數據的嫌疑,電商平臺運營人員為了提升商品的購買轉化率,保證平臺的使用體驗,經常會進行一些用戶行為模式分析。基於這個場景,我們可以構建一個Flink作業,實時監控識別不同模式的用戶。為了避免每次更新規則模式後重啟部署,我們可以將規則模式作為一個數據流與用戶行為數據流connect在一起,並將規則模式以Broadcast State的形式廣播到每個算子實例上。
電商用戶行為識別案例
下面開始具體構建一個實例程序。第一步,我們定義一些必要的數據結構來描述這個業務場景,包括用戶行為和規則模式兩個數據結構。
<code>/** * 用戶行為 * categoryId為商品類目ID * behavior包括點擊(pv)、購買(buy)、加購物車(cart)、喜歡(fav) * */case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)/** * 行為模式 * 整個模式簡化為兩個行為 * */case class BehaviorPattern(firstBehavior: String, secondBehavior: String)/<code>
然後我們在主邏輯中讀取兩個數據流:
<code>// 主數據流val userBehaviorStream: DataStream[UserBehavior] = ...// BehaviorPattern數據流val patternStream: DataStream[BehaviorPattern] = .../<code>
目前Broadcast State只支持使用Key-Value形式,需要使用MapStateDescriptor來描述。這裡我們使用一個比較簡單的行為模式,因此Key是一個空類型。當然我們也可以根據業務場景,構造複雜的Key-Value對。然後,我們將模式流使用broadcast方法廣播到所有算子子任務上。
<code>// Broadcast State只能使用 Key->Value 結構,基於MapStateDescriptorval broadcastStateDescriptor =new MapStateDescriptor[Void, BehaviorPattern]("behaviorPattern", classOf[Void], classOf[BehaviorPattern])val broadcastStream: BroadcastStream[BehaviorPattern] = patternStream.broadcast(broadcastStateDescriptor)/<code>
用戶行為模式流先按照用戶ID進行keyBy,然後與廣播流合併:
<code>// 生成一個KeyedStreamval keyedStream = userBehaviorStream.keyBy(user => user.userId)// 在KeyedStream上進行connect和processval matchedStream = keyedStream .connect(broadcastStream) .process(new BroadcastPatternFunction)/<code>
BroadcastPatternFunction是KeyedBroadcastProcessFunction的具體實現,它基於Broadcast State處理主數據流,生成(Long, BehaviorPattern),分別表示用戶ID和命中的行為模式。下面的代碼展示了具體的使用方法。
<code>/** * 四個泛型分別為: * 1. KeyedStream中Key的數據類型 * 2. 主數據流的數據類型 * 3. 廣播流的數據類型 * 4. 輸出類型 * */class BroadcastPatternFunctionextends KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)] { // 用戶上次性能狀態句柄,每個用戶存儲一個狀態 private var lastBehaviorState: ValueState[String] = _ // Broadcast State Descriptor private var bcPatternDesc: MapStateDescriptor[Void, BehaviorPattern] = _ override def open(parameters: Configuration): Unit = { lastBehaviorState = getRuntimeContext.getState( new ValueStateDescriptor[String]("lastBehaviorState", classOf[String]) ) bcPatternDesc = new MapStateDescriptor[Void, BehaviorPattern]("behaviorPattern", classOf[Void], classOf[BehaviorPattern]) } // 當BehaviorPattern流有新數據時,更新BroadcastState override def processBroadcastElement(pattern: BehaviorPattern, context: KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)]#Context, collector: Collector[(Long, BehaviorPattern)]): Unit = { val bcPatternState: BroadcastState[Void, BehaviorPattern] = context.getBroadcastState(bcPatternDesc) // 將新數據更新至Broadcast State,這裡使用一個null作為Key // 在本場景中所有數據都共享一個Pattern,因此這裡偽造了一個Key bcPatternState.put(null, pattern) } override def processElement(userBehavior: UserBehavior, context: KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)]#ReadOnlyContext, collector: Collector[(Long, BehaviorPattern)]): Unit = { // 獲取最新的Broadcast State val pattern: BehaviorPattern = context.getBroadcastState(bcPatternDesc).get(null) val lastBehavior: String = lastBehaviorState.value() if (pattern != null && lastBehavior != null) { // 用戶之前有過行為,檢查是否符合給定的模式 if (pattern.firstBehavior.equals(lastBehavior) && pattern.secondBehavior.equals(userBehavior.behavior)) // 當前用戶行為符合模式 collector.collect((userBehavior.userId, pattern)) } lastBehaviorState.update(userBehavior.behavior) }}/<code>
總結下來,使用Broadcast State需要進行下面三步:
- 接收一個普通數據流,並使用broadcast方法將其轉換為BroadcastStream,因為Broadcast State目前只支持Key-Value結構,需要使用MapStateDescriptor描述它的數據結構。
- 將BroadcastStream與一個DataStream或KeyedStream使用connect方法連接到一起。
- 實現一個ProcessFunction,如果主流是DataStream,則需要實現BroadcastProcessFunction;如果主流是KeyedStream,則需要實現KeyedBroadcastProcessFunction。這兩種函數都提供了時間和狀態的訪問方法。
在KeyedBroadcastProcessFunction個函數類中,有兩個函數需要實現:
- processElement:處理主數據流(非Broadcast流)中的每條元素,輸出零到多個數據。ReadOnlyContext 可以獲取時間和狀態,但是隻能以只讀的形式讀取Broadcast State,不能修改,以保證每個算子實例上的Broadcast State都是相同的。
- processBroadcastElement:處理流入的廣播流,可以輸出零到多個數據,一般用來更新Broadcast State。
此外,在KeyedBroadcastProcessFunction中可以註冊Timer,並在onTimer方法中實現回調邏輯。本例中為了保持代碼簡潔,沒有使用,一般可以用來清空狀態,避免狀態無限增長下去。
小結
本文解釋了Broadcast State的原理和使用場景,並以電商平臺用戶行為分析為例演示了具體的使用方法。
閱讀更多 皮皮魯的AI星球 的文章