Flink實時計算topN熱榜

topN的常見應用場景,最熱商品購買量,最高人氣作者的閱讀量等等。

1. 用到的知識點

  • Flink創建kafka數據源;

  • 基於 EventTime 處理,如何指定 Watermark;

  • Flink中的Window,滾動(tumbling)窗口與滑動(sliding)窗口;

  • State狀態的使用;

  • ProcessFunction 實現 TopN 功能;

2. 案例介紹

通過用戶訪問日誌,計算最近一段時間平臺最活躍的幾位用戶topN。

  • 創建kafka生產者,發送測試數據到kafka;

  • 消費kafka數據,使用滑動(sliding)窗口,每隔一段時間更新一次排名;

3. 數據源

這裡使用kafka api發送測試數據到kafka,代碼如下:

<code>



classUser{

private

long id;

private

String

username;

private

String

password;

private

long timestamp;
}

Map<

String

,

String

> config = Configuration.initConfig(

"commons.xml"

);


void

sendData throws InterruptedException {

int cnt =

0

;

while

(cnt

200

){
User user =

new

User;
user.setId(cnt);
user.setUsername(

"username"

+

new

Random.nextInt((cnt %

5

) +

2

));
user.setPassword(

"password"

+ cnt);
user.setTimestamp(System.currentTimeMillis);
Future future = KafkaUtil.sendDataToKafka(config.get(

"kafka-topic"

),

String

.valueOf(cnt),

JSON

.toJSONString(user));

while

(!future.isDone){
Thread.sleep(

100

);
}

try

{
RecordMetadata recordMetadata = future.get;
System.out.println(recordMetadata.offset);
}

catch

(InterruptedException e) {
e.printStackTrace;
}

catch

(ExecutionException e) {
e.printStackTrace;
}
System.out.println(

"發送消息:"

+ cnt +

"******"

+ user.toString);
cnt = cnt +

1

;
}
}
/<code>

這裡通過隨機數來擾亂username,便於使用戶名大小不一,讓結果更加明顯。KafkaUtil是自己寫的一個kafka工具類,代碼很簡單,主要是平時做測試方便。

4. 主要程序

創建一個main程序,開始編寫代碼。

創建flink環境,關聯kafka數據源。

<code>Map config = Configuration.initConfig(

"commons.xml"

);

Properties kafkaProps =

new

Properties;
kafkaProps.setProperty(

"zookeeper.connect"

, config.

get

(

"kafka-zookeeper"

));
kafkaProps.setProperty(

"bootstrap.servers"

, config.

get

(

"kafka-ipport"

));
kafkaProps.setProperty(

"group.id"

, config.

get

(

"kafka-groupid"

));

StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment;
/<code>

EventTime 與 Watermark

<code>

senv

.setStreamTimeCharacteristic

(

TimeCharacteristic

.EventTime

);
/<code>

設置屬性<code>
senv.setStreamTimeCharacteristic(
TimeCharacteristic.EventTime)/<code>,表示按照數據時間字段來處理,默認是<code>
TimeCharacteristic.ProcessingTime/<code>

<code>

private

static

final

TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
/<code>

這個屬性必須設置,否則後面,可能窗口結束無法觸發,導致結果無法輸出。取值有三種:

  • ProcessingTime:事件被處理的時間。也就是由flink集群機器的系統時間來決定。

  • EventTime:事件發生的時間。一般就是數據本身攜帶的時間。

  • IngestionTime:攝入時間,數據進入flink流的時間,跟ProcessingTime還是有區別的;

指定好使用數據的實際時間來處理,接下來需要指定flink程序如何get到數據的時間字段,這裡使用調用DataStream的
assignTimestampsAndWatermarks方法,抽取時間和設置watermark。

<code>senv.addSource(

new

FlinkKafkaConsumer010<>(
config.get(

"kafka-topic"

),

new

SimpleStringSchema,
kafkaProps
)
).map(x ->{

return

JSON.parseObject(x, User

.

class

)

;
}).assignTimestampsAndWatermarks(

new

BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(

1000

)) {

longextractTimestamp(User element) {

return

element.getTimestamp;
}
})
/<code>

前面給出的代碼中可以看出,由於發送到kafka的時候,將User對象轉換為json字符串了,這裡使用的是fastjson,接收過來可以轉化為JsonObject來處理,我這裡還是將其轉化為User對象<code>JSON.parseObject(x, User.class)/<code>,便於處理。

這裡考慮到數據可能亂序,使用了可以處理亂序的抽象類<code>
BoundedOutOfOrdernessTimestampExtractor/<code>,並且實現了唯一的一個沒有實現的方法<code>extractTimestamp/<code>,亂序數據,會導致數據延遲,在構造方法中傳入了一個<code>Time.milliseconds(1000)/<code>,表明數據可以延遲一秒鐘。比如說,如果窗口長度是10s,0~10s的數據會在11s的時候計算,此時watermark是10,才會觸發計算,也就是說引入watermark處理亂序數據,最多可以容忍0~t這個窗口的數據,最晚在t+1時刻到來。

Flink實時計算topN熱榜

具體關於watermark的講解可以參考這篇文章

https://blog.csdn.net/qq_39657909/article/details/106081543

窗口統計

業務需求上,通常可能是一個小時,或者過去15分鐘的數據,5分鐘更新一次排名,這裡為了演示效果,窗口長度取10s,每次滑動(slide)5s,即5秒鐘更新一次過去10s的排名數據。

<code>

.keyBy

("

username

")

.timeWindow

(

Time

.seconds

(10),

Time

.seconds

(5))

.aggregate

(

new

CountAgg

,

new

WindowResultFunction

)
/<code>

我們使用<code>.keyBy("username")/<code>對用戶進行分組,使用<code>.timeWindow(Time size, Time slide)/<code>對每個用戶做滑動窗口(10s窗口,5s滑動一次)。然後我們使用<code>.aggregate(AggregateFunction af, WindowFunction wf)/<code>做增量的聚合操作,它能使用<code>AggregateFunction/<code>提前聚合掉數據,減少 state 的存儲壓力。較之<code>.apply(WindowFunction wf)/<code>會將窗口中的數據都存儲下來,最後一起計算要高效地多。<code>aggregate/<code>方法的第一個參數用於

這裡的<code>CountAgg/<code>實現了<code>AggregateFunction/<code>接口,功能是統計窗口中的條數,即遇到一條數據就加一。

<code>

public

class

CountAgg

implements

AggregateFunction

<

User, Long, Long

>

{

Long

createAccumulator {

return

0L

;
}


Long

add(User value,

Long

accumulator) {

return

accumulator +

1

;
}


Long

getResult(

Long

accumulator) {

return

accumulator;
}


Long

merge(

Long

a,

Long

b) {

return

a + b;
}
}
/<code>

<code>.aggregate(AggregateFunction af, WindowFunction wf)/<code>的第二個參數<code>WindowFunction/<code>將每個 key每個窗口聚合後的結果帶上其他信息進行輸出。我們這裡實現的<code>WindowResultFunction/<code>將用戶名,窗口,訪問量封裝成了<code>UserViewCount/<code>進行輸出。

<code>

private

static

class

WindowResultFunction

implements

WindowFunction

<

Long

,

UserViewCount

,

Tuple

,

TimeWindow

>

{



void

apply

(Tuple key, TimeWindow window, Iterable input, Collector out)

throws

Exception

{
Long count = input.iterator.next;
out.collect(

new

UserViewCount(((Tuple1)key).f0, window.getEnd, count));
}
}





static

classUserViewCount{

private

String userName;

private

long

windowEnd;

private

long

viewCount;

}
/<code>

TopN計算最活躍用戶

為了統計每個窗口下活躍的用戶,我們需要再次按窗口進行分組,這裡根據<code>UserViewCount/<code>中的<code>windowEnd/<code>進行<code>keyBy/<code>操作。然後使用<code>ProcessFunction/<code>實現一個自定義的 TopN 函數<code>TopNHotItems/<code>來計算點擊量排名前3名的用戶,並將排名結果格式化成字符串,便於後續輸出。

<code>.keyBy(

"windowEnd"

)
.process(

new

TopNHotUsers(

3

))
.

print

;
/<code>

<code>ProcessFunction/<code>是 Flink 提供的一個 low-level API,用於實現更高級的功能。它主要提供了定時器 timer 的功能(支持EventTime或ProcessingTime)。本案例中我們將利用 timer 來判斷何時收齊了某個 window 下所有用戶的訪問數據。由於 Watermark 的進度是全局的,在<code>processElement/<code>方法中,每當收到一條數據(<code>ItemViewCount/<code>),我們就註冊一個<code>windowEnd+1/<code>的定時器(Flink 框架會自動忽略同一時間的重複註冊)。<code>windowEnd+1/<code>的定時器被觸發時,意味著收到了<code>windowEnd+1/<code>的 Watermark,即收齊了該<code>windowEnd/<code>下的所有用戶窗口統計值。我們在<code>onTimer/<code>中處理將收集的所有商品及點擊量進行排序,選出 TopN,並將排名信息格式化成字符串後進行輸出。

這裡我們還使用了 <code>ListState/<code>來存儲收到的每條<code>UserViewCount/<code>消息,保證在發生故障時,狀態數據的不丟失和一致性。<code>ListState/<code>是 Flink 提供的類似 Java<code>List/<code>接口的 State API,它集成了框架的 checkpoint 機制,自動做到了 exactly-once 的語義保證。

<code>private static 

class

TopNHotUsers

extends

KeyedProcessFunction

UserViewCount

,

String

> {

private

int

topSize

;


private ListState userViewCountListState;

publicTopNHotUsers(int topSize) {
this.topSize = topSize;
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {

super

.onTimer(timestamp, ctx, out);
List userViewCounts = new ArrayList<>;

for

(UserViewCount userViewCount : userViewCountListState.get) {
userViewCounts.add(userViewCount);
}

userViewCountListState.clear;

userViewCounts.sort(new Comparator {
@Override
public intcompare(UserViewCount o1, UserViewCount o2) {

return

(int)(o2.viewCount - o1.viewCount);
}
});

//

將排名信息格式化成 String, 便於打印
StringBuilder result = new StringBuilder;
result.append(

"====================================\n"

);
result.append(

"時間: "

).append(new Timestamp(timestamp-

1

)).append(

"\n"

);

for

(int i =

0

; i < topSize; i++) {
UserViewCount currentItem = userViewCounts.get(i);

//

No1:

商品ID=

12224

瀏覽量=

2413


result.append(

"No"

).append(i).append(

":"

)
.append(

" 用戶名="

).append(currentItem.userName)
.append(

" 瀏覽量="

).append(currentItem.viewCount)
.append(

"\n"

);
}
result.append(

"====================================\n\n"

);

Thread.sleep(

1000

);

out.collect(result.toString);

}

@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {

super

.open(parameters);
ListStateDescriptor userViewCountListStateDescriptor = new ListStateDescriptor<>(

"user-state"

,
UserViewCount

.

class


);


userViewCountListState = getRuntimeContext.getListState(userViewCountListStateDescriptor);

}

@Override
public void processElement(UserViewCount value, Context ctx, Collector out) throws Exception {

userViewCountListState.add(value);
ctx.timerService.registerEventTimeTimer(value.windowEnd +

1000

);
}
}
/<code>

結果輸出

可以看到,每隔5秒鐘更新輸出一次數據。

Flink實時計算topN熱榜
Flink實時計算topN熱榜

最後說一句(求關注,別白嫖我)

掃一掃,我們的故事就開始了。

Flink實時計算topN熱榜

另外公眾號改變了推送規則,大家看文章不要忘記點擊最下方的在看,點贊按鈕,這樣微信自動識別為常看公眾號,否則很可能推送的文章可能淹沒在別的文章找不到,謝謝大家。

讓我知道你在看

Flink實時計算topN熱榜


分享到:


相關文章: