基於Flink計算引擎的流式處理平臺—關注低延遲和CEP

導讀:繼Wormhole的設計思想介紹和功能介紹之後,相信大家對Wormhole已經有了初步的瞭解。2018年7月31日,我們發佈了Wormhole_0.5新版本,與以往基於Spark計算引擎的版本相比,該版本新增了基於Flink計算引擎的流式處理功能,主要關注低延遲和CEP。基於Flink計算引擎版本具體內容是什麼呢?還請各位看官移步正文~


Wormhole Flink版介紹

數據集(Resilient Distributed Dataset,RDD)進行微批處理,所以Spark在流式處理方面,不可避免會存在一些延時,只支持秒級延遲。Flink基於事件處理,實現了真正的流式計算。與基於Spark的流式處理相比,它的延遲更低。Wormhole通過對Flink計算引擎的支持,將延遲降低到毫秒級。

Wormhole Flink版除了支持Flink SQL,Lookup SQL,新增了對CEP的支持,並且支持三者的混合編排,即一個Flow中可以包含多個Flink SQL,多個Lookup SQL和多個CEP。Flink SQL與Spark SQL用法類似,Spark SQL和Lookup SQL在上一篇Wormhole系列文章中已經介紹過(參考: ),這裡將不再贅述,下面我們將重點講解CEP。

CEP(複雜事件處理)簡介

在傳統DBMS中,所有的操作都只能在數據落庫之後才能進行,這極大地降低了事件處理的實時性。與傳統DBMS不同,CEP從流式事件中查找匹配指定模式的事件,對流式事件邊獲取邊處理,整個處理過程都在數據流中進行,無需落地,因此它擁有更低的延遲,即所有輸入都將被立刻處理,一旦在流式事件中發現了匹配指定模式的事件集,結果就會立即輸出。

正因如此,CEP引起了廣泛的關注,並得到了大量的應用推廣,主要體現在運營和運維兩方面。在運營方面,CEP經常被應用於金融產品中,例如,股票市場趨勢預測、信用卡詐騙預防等。在運維方面,CEP被用在基於RFID的追蹤和監控系統中,例如,檢測庫房中失竊的物品。當然,CEP也能通過指定嫌疑人的行為,來檢測網絡入侵。

Wormhole CEP是基於Flink CEP實現的,並且提供了可視化操作界面,無需編碼即可快速實現業務需求。Wormhole CEP引入了窗口時間(Window Time),窗口策略(Strategy),分組策略(KeyBy),輸出格式(Output),篩選規則(Pattern)等概念。下面,我們逐一介紹這些概念的含義及用途。

  • Window Time:指在觸發了符合Begin Pattern的事件記錄後的窗口時間,如果watermark的time超過了觸發時間+窗口時間,本次pattern結束;
  • Strategy:包含NO_SKIP和SKIP_PAST_LAST_EVENT兩種策略,前者對應事件滑動策略,後者對應事件滾動策略,具體區別可以借鑑下面的例子:

事件滑動:a1 a2 a3 a4 .......

a2 a3 a4 a5 ........

a3 a4 a5 a6 ........

事件滾動: a1 a2 a3 a4 ........

a5 a6 a7 a8 ........

a9 a10 a11 a12......

  • KeyBy:指依據事件中的哪個字段來做分區。例如,現在有一條數據,它的schema包括ums_id_, ums_op_, ums_ts_, value1, value2等幾個字段,這裡選定value1來做分區的依賴字段,那麼,與value1字段相同的數據將被分配到同一個分組上。CEP操作將分別針對每一分組的數據進行處理,KeyBy可以作用在多個字段上。
  • Output:輸出結果的形式,分為三類:Agg、Detail、FilteredRow

✔ Agg:將匹配的多條數據做聚合,生成一條數據輸出。例:field1:avg,field2:max(目前支持max/min/avg/sum)

✔ Detail:將匹配的多條數據逐一輸出

✔ FilteredRow:按條件選擇指定的一條數據輸出。例:head/last/field1:min/max

  • Pattern:篩選規則。每個CEP由若干個Pattern組成。每個Pattern包括以下三個概念:

✔ Operator:操作算子。CEP中的第一個Pattern Operator只能為begin,其後的每個Pattern Operator只能為next、followedBy、notNext、notFollowedBy四種類型中的一種,其含義分別為:

◇ next:會追加一個新的Pattern對象到既有的Pattern之後,它表示當前模式運算符所匹配的事件必須是嚴格緊鄰的,這意味著兩個被匹配的事件必須是前後緊鄰,中間沒有其他元素;

◇ followedBy:會追加一個新的Pattern到既有的Pattern之後(其實返回的是一個FollowedByPattern對象,它是Pattern的派生類),它表示當前運算符所匹配的事件不必嚴格緊鄰,這意味著匹配的兩個事件之間允許穿插其他事件;

◇ notNext:增加一個新的否定模式。匹配(否定)事件必須直接輸出先前的匹配事件(嚴格緊鄰),以便將部分匹配丟棄;

◇ notFollowedBy:會丟棄或者跳過已匹配的事件(注:notFollowedBy不能為最後一個Pattern)。

✔ Quantifier:用來指定滿足某一pattern的事件數量。目前配置包括:一條及以上,指定條數,指定條數及以上;這裡需要特殊說明的是,notNext、notFollowedBy這兩種Operator無法設置Quantifier;

✔ Conditions:判斷條件。用戶可以針對事件的某個或多個屬性設置判斷條件,例如,可以設置只有符合value1 like a and value2 >=10的事件才是符合條件的事件。

Wormhole CEP應用場景

1、場景一:網絡DDOS攻擊警告

Wormhole CEP在日常運維中被廣泛應用。下面以運維中會遇到的一類情況為例,來介紹如何使用Wormhole CEP。

DDOS攻擊是日常運維中經常遇到的一類問題,CEP正好可以用來對DDOS攻擊進行預警。

DDOS攻擊的判斷規則如下:

正常:流量在預設的正常範圍內;

警告:某數據中心在10秒內連續2次上報的流量超過認定的正常值;

報警:某數據中心在30秒內連續2次匹配警告;

通知:報警後需要短信/郵件通知相關人員。

通過上述規則,DDOS攻擊的判斷依據可以被量化為流量超出事件在一定時間內多次產生。只要符合條件,客戶請求就可以被認定為DDOS攻擊。針對符合條件的事件,Wormhole會向Kafka傳入報警消息,並由業務系統去Kafka中消費報警消息,從而進行相應的後續處理。

基於Flink計算引擎的流式處理平臺—關注低延遲和CEP

圖1 kafka業務系統消費示意圖

下面,結合一個具體的操作例子來說明Wormhole CEP是如何檢測DDOS攻擊的。

首先,針對警告規則,設置一個窗口時間為10秒,次數為2次,判斷條件為流量超過45(GB)的CEP,作為第一個CEP,並將事件發生時間,以及次數1作為中間結果進行輸出;

基於Flink計算引擎的流式處理平臺—關注低延遲和CEP

圖2 設置警告CEP

然後,針對報警規則,再設置一個窗口為30秒,判斷條件為警告事件發生次數為2次作為第二個CEP。針對符合條件的事件,向Kafka中傳入報警消息,否則,不做任何操作。

基於Flink計算引擎的流式處理平臺—關注低延遲和CEP

圖3 設置報警CEP

最終,設置完兩個CEP之後,它們將對所有流上事件進行疊加過濾,並針對符合條件的事件,向Kafka寫入報警消息,從而,協助各個數據中心預防DDOS攻擊。

基於Flink計算引擎的流式處理平臺—關注低延遲和CEP

圖4 CEP列表

2、場景二:電商業務人工外呼通知

Wormhole CEP在運營中也起到了重要作用,比如在電商平臺中,客戶填寫提交訂單後,由於某些原因長時間未付款,這時需要人工介入處理,如給客戶打電話進行回訪,從而瞭解客戶情況,提高業務成交量及服務質量。下面以此業務場景為例,介紹如何通過Wormhole CEP來實現此類業務需求。

這裡將購物步驟簡化為兩步,第一步提交訂單,第二步付款。若某一客戶在提交訂單後,5min內未付款,則平臺通知工作人員聯繫客戶。假設事件流不斷流入Kafka中,事件中userid字段代表客戶ID;state字段代表訂單狀態(s1是“已提交”,s2是“已付款”)。通過CEP很容易實現上述需求,首先設置第一個Pattern,用來匹配某客戶提交訂單事件,即state=s1;然後設置第二個Pattern,用來匹配該客戶未付款事件,即相鄰的事件中state=s2未發生。滿足兩個規則數據即滿足需要人工外呼條件,這時系統發消息通知工作人員聯繫該客戶。

基於Flink計算引擎的流式處理平臺—關注低延遲和CEP

圖5 電商平臺數據處理示意圖

針對需求,需要設置一個300s(即5min)的窗口,然後按照userid分組,對每個客戶分別進行匹配。

基於Flink計算引擎的流式處理平臺—關注低延遲和CEP

圖6 CEP基本配置

第一個pattern為客戶已提交訂單。

基於Flink計算引擎的流式處理平臺—關注低延遲和CEP

圖7 Pattern Begin

第二個Pattern為客戶未付款。

基於Flink計算引擎的流式處理平臺—關注低延遲和CEP

圖8 Pattern notNext

最終,該CEP將對所有流上事件進行過濾,並針對符合條件的事件,將數據發送到Kafka,人工外呼系統根據此數據觸發相關業務流程。

總的來說,Wormhole_v0.5主要是針對Flink實現了流式處理,關注點是低延遲和CEP。目前版本處理支持Flink SQL,Lookup SQL,CEP,並且支持三者的混合編排。同時,新增Spark Stream支持配置用戶自定義Topic,可直接對接DBus獨立拉全量功能。後續我們會盡快支持ums_extension(目前支持ums)、異構sink(目前支持Kafka)、udf等功能。歡迎大家持續關注!


分享到:


相關文章: