topN的常見應用場景,最熱商品購買量,最高人氣作者的閱讀量等等。
1. 用到的知識點
Flink創建kafka數據源;
基於 EventTime 處理,如何指定 Watermark;
Flink中的Window,滾動(tumbling)窗口與滑動(sliding)窗口;
State狀態的使用;
ProcessFunction 實現 TopN 功能;
2. 案例介紹
通過用戶訪問日誌,計算最近一段時間平臺最活躍的幾位用戶topN。
創建kafka生產者,發送測試數據到kafka;
消費kafka數據,使用滑動(sliding)窗口,每隔一段時間更新一次排名;
3. 數據源
這裡使用kafka api發送測試數據到kafka,代碼如下:
<code>
classUser{private
long id;private
String
username;private
String
password;private
long timestamp;
}
Map<String
,String
> config = Configuration.initConfig("commons.xml"
);
void
sendData throws InterruptedException {
int cnt =0
;while
(cnt200
){
User user =new
User;
user.setId(cnt);
user.setUsername("username"
+new
Random.nextInt((cnt %5
) +2
));
user.setPassword("password"
+ cnt);
user.setTimestamp(System.currentTimeMillis);
Future future = KafkaUtil.sendDataToKafka(config.get("kafka-topic"
),String
.valueOf(cnt),JSON
.toJSONString(user));while
(!future.isDone){
Thread.sleep(100
);
}try
{
RecordMetadata recordMetadata = future.get;
System.out.println(recordMetadata.offset);
}catch
(InterruptedException e) {
e.printStackTrace;
}catch
(ExecutionException e) {
e.printStackTrace;
}
System.out.println("發送消息:"
+ cnt +"******"
+ user.toString);
cnt = cnt +1
;
}
}
/<code>
這裡通過隨機數來擾亂username,便於使用戶名大小不一,讓結果更加明顯。KafkaUtil是自己寫的一個kafka工具類,代碼很簡單,主要是平時做測試方便。
4. 主要程序
創建一個main程序,開始編寫代碼。
創建flink環境,關聯kafka數據源。
<code>Map config = Configuration.initConfig("commons.xml"
);
Properties kafkaProps =new
Properties;
kafkaProps.setProperty("zookeeper.connect"
, config.get
("kafka-zookeeper"
));
kafkaProps.setProperty("bootstrap.servers"
, config.get
("kafka-ipport"
));
kafkaProps.setProperty("group.id"
, config.get
("kafka-groupid"
));
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment;
/<code>
EventTime 與 Watermark
<code>senv
.setStreamTimeCharacteristic
(TimeCharacteristic
.EventTime
);
/<code>
設置屬性<code>
senv.setStreamTimeCharacteristic(
TimeCharacteristic.EventTime)/<code>,表示按照數據時間字段來處理,默認是<code>
TimeCharacteristic.ProcessingTime/<code>
<code>private
static
final
TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
/<code>
這個屬性必須設置,否則後面,可能窗口結束無法觸發,導致結果無法輸出。取值有三種:
ProcessingTime:事件被處理的時間。也就是由flink集群機器的系統時間來決定。
EventTime:事件發生的時間。一般就是數據本身攜帶的時間。
IngestionTime:攝入時間,數據進入flink流的時間,跟ProcessingTime還是有區別的;
指定好使用數據的實際時間來處理,接下來需要指定flink程序如何get到數據的時間字段,這裡使用調用DataStream的
assignTimestampsAndWatermarks方法,抽取時間和設置watermark。
<code>senv.addSource(new
FlinkKafkaConsumer010<>(
config.get("kafka-topic"
),new
SimpleStringSchema,
kafkaProps
)
).map(x ->{return
JSON.parseObject(x, User.
class
);
}).assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1000
)) {
longextractTimestamp(User element) {return
element.getTimestamp;
}
})
/<code>
前面給出的代碼中可以看出,由於發送到kafka的時候,將User對象轉換為json字符串了,這裡使用的是fastjson,接收過來可以轉化為JsonObject來處理,我這裡還是將其轉化為User對象<code>JSON.parseObject(x, User.class)/<code>,便於處理。
這裡考慮到數據可能亂序,使用了可以處理亂序的抽象類<code>
BoundedOutOfOrdernessTimestampExtractor/<code>,並且實現了唯一的一個沒有實現的方法<code>extractTimestamp/<code>,亂序數據,會導致數據延遲,在構造方法中傳入了一個<code>Time.milliseconds(1000)/<code>,表明數據可以延遲一秒鐘。比如說,如果窗口長度是10s,0~10s的數據會在11s的時候計算,此時watermark是10,才會觸發計算,也就是說引入watermark處理亂序數據,最多可以容忍0~t這個窗口的數據,最晚在t+1時刻到來。
具體關於watermark的講解可以參考這篇文章
https://blog.csdn.net/qq_39657909/article/details/106081543
窗口統計
業務需求上,通常可能是一個小時,或者過去15分鐘的數據,5分鐘更新一次排名,這裡為了演示效果,窗口長度取10s,每次滑動(slide)5s,即5秒鐘更新一次過去10s的排名數據。
<code>.keyBy
("username
").timeWindow
(Time
.seconds
(10),Time
.seconds
(5)).aggregate
(new
CountAgg
,new
WindowResultFunction
)
/<code>
我們使用<code>.keyBy("username")/<code>對用戶進行分組,使用<code>.timeWindow(Time size, Time slide)/<code>對每個用戶做滑動窗口(10s窗口,5s滑動一次)。然後我們使用<code>.aggregate(AggregateFunction af, WindowFunction wf)/<code>做增量的聚合操作,它能使用<code>AggregateFunction/<code>提前聚合掉數據,減少 state 的存儲壓力。較之<code>.apply(WindowFunction wf)/<code>會將窗口中的數據都存儲下來,最後一起計算要高效地多。<code>aggregate/<code>方法的第一個參數用於
這裡的<code>CountAgg/<code>實現了<code>AggregateFunction/<code>接口,功能是統計窗口中的條數,即遇到一條數據就加一。
<code>public
class
CountAgg
implements
AggregateFunction
<User, Long, Long
>{
Long
createAccumulator {return
0L
;
}
Long
add(User value,Long
accumulator) {return
accumulator +1
;
}
Long
getResult(Long
accumulator) {return
accumulator;
}
Long
merge(Long
a,Long
b) {return
a + b;
}
}
/<code>
<code>.aggregate(AggregateFunction af, WindowFunction wf)/<code>的第二個參數<code>WindowFunction/<code>將每個 key每個窗口聚合後的結果帶上其他信息進行輸出。我們這裡實現的<code>WindowResultFunction/<code>將用戶名,窗口,訪問量封裝成了<code>UserViewCount/<code>進行輸出。
<code>private
static
class
WindowResultFunction
implements
WindowFunction
<Long
,UserViewCount
,Tuple
,TimeWindow
> {
void
apply
(Tuple key, TimeWindow window, Iterable input, Collector out)
throws
Exception {
Long count = input.iterator.next;
out.collect(new
UserViewCount(((Tuple1)key).f0, window.getEnd, count));
}
}
static
classUserViewCount{private
String userName;private
long
windowEnd;private
long
viewCount;
}
/<code>
TopN計算最活躍用戶
為了統計每個窗口下活躍的用戶,我們需要再次按窗口進行分組,這裡根據<code>UserViewCount/<code>中的<code>windowEnd/<code>進行<code>keyBy/<code>操作。然後使用<code>ProcessFunction/<code>實現一個自定義的 TopN 函數<code>TopNHotItems/<code>來計算點擊量排名前3名的用戶,並將排名結果格式化成字符串,便於後續輸出。
<code>.keyBy("windowEnd"
)
.process(new
TopNHotUsers(3
))
.
/<code>
<code>ProcessFunction/<code>是 Flink 提供的一個 low-level API,用於實現更高級的功能。它主要提供了定時器 timer 的功能(支持EventTime或ProcessingTime)。本案例中我們將利用 timer 來判斷何時收齊了某個 window 下所有用戶的訪問數據。由於 Watermark 的進度是全局的,在<code>processElement/<code>方法中,每當收到一條數據(<code>ItemViewCount/<code>),我們就註冊一個<code>windowEnd+1/<code>的定時器(Flink 框架會自動忽略同一時間的重複註冊)。<code>windowEnd+1/<code>的定時器被觸發時,意味著收到了<code>windowEnd+1/<code>的 Watermark,即收齊了該<code>windowEnd/<code>下的所有用戶窗口統計值。我們在<code>onTimer/<code>中處理將收集的所有商品及點擊量進行排序,選出 TopN,並將排名信息格式化成字符串後進行輸出。
這裡我們還使用了 <code>ListState/<code>來存儲收到的每條<code>UserViewCount/<code>消息,保證在發生故障時,狀態數據的不丟失和一致性。<code>ListState/<code>是 Flink 提供的類似 Java<code>List/<code>接口的 State API,它集成了框架的 checkpoint 機制,自動做到了 exactly-once 的語義保證。
<code>private staticclass
TopNHotUsers
extends
KeyedProcessFunction
UserViewCount,String
> {private
int
topSize
;
private ListState userViewCountListState;
publicTopNHotUsers(int topSize) {
this.topSize = topSize;
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {super
.onTimer(timestamp, ctx, out);
List userViewCounts = new ArrayList<>;for
(UserViewCount userViewCount : userViewCountListState.get) {
userViewCounts.add(userViewCount);
}
userViewCountListState.clear;
userViewCounts.sort(new Comparator {
@Override
public intcompare(UserViewCount o1, UserViewCount o2) {return
(int)(o2.viewCount - o1.viewCount);
}
});//
將排名信息格式化成 String, 便於打印
StringBuilder result = new StringBuilder;
result.append("====================================\n"
);
result.append("時間: "
).append(new Timestamp(timestamp-1
)).append("\n"
);for
(int i =0
; i < topSize; i++) {
UserViewCount currentItem = userViewCounts.get(i);//
No1:
商品ID=12224
瀏覽量=2413
result.append("No"
).append(i).append(":"
)
.append(" 用戶名="
).append(currentItem.userName)
.append(" 瀏覽量="
).append(currentItem.viewCount)
.append("\n"
);
}
result.append("====================================\n\n"
);
Thread.sleep(1000
);
out.collect(result.toString);
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {super
.open(parameters);
ListStateDescriptor userViewCountListStateDescriptor = new ListStateDescriptor<>("user-state"
,
UserViewCount.
class
);
userViewCountListState = getRuntimeContext.getListState(userViewCountListStateDescriptor);
}
@Override
public void processElement(UserViewCount value, Context ctx, Collector out) throws Exception {
userViewCountListState.add(value);
ctx.timerService.registerEventTimeTimer(value.windowEnd +1000
);
}
}
/<code>
結果輸出
可以看到,每隔5秒鐘更新輸出一次數據。
最後說一句(求關注,別白嫖我)
掃一掃,我們的故事就開始了。
另外公眾號改變了推送規則,大家看文章不要忘記點擊最下方的在看,點贊按鈕,這樣微信自動識別為常看公眾號,否則很可能推送的文章可能淹沒在別的文章找不到,謝謝大家。
讓我知道你在看
關鍵字: UserViewCount 購買量 實時