Pick!閒魚億級商品庫中的秒級實時選品

一、業務背景

在電商運營工作中,營銷活動是非常重要的部分,對用戶增長和GMV都有很大幫助。對電商運營來說,如何從龐大的商品庫中篩選出賣家優質商品並推送給有需要的買家購買是每時每刻都要思索的問題,而且這個過程需要儘可能快和實時。保證快和實時就可以提升買賣雙方的用戶體驗,提高用戶粘性。

二、實時選品

為了解決上面提到的問題,閒魚研發了馬赫系統。馬赫是一個實時高性能的商品選品系統,解決在億級別商品中通過規則篩選優質商品並進行投放的場景。有了馬赫系統之後,閒魚的運營同學可以在馬赫系統上創建篩選規則,比如商品標題包含“小豬佩奇”、類目為“玩具”、價格不超過100元且商品狀態為未賣出。在運營創建規則後,馬赫系統會同時進行兩步操作,第一步是從存量商品數據篩選符合條件的商品進行打標;第二步是對商品實時變更進行規則計算,實時同步規則命中結果。

馬赫系統最大的特點是快而實時,體現在命中規模為100w的規則可以在10分鐘之內完成打標;商品本身變更導致的規則命中結果同步時間為1秒鐘。運營可以通過馬赫系統快速篩選商品向用戶投放,閒魚的流量也可以精準投給符合條件的商品並且將流量利用到最大化。

那麼馬赫系統是如何解決這一典型的電商問題的呢,馬赫系統和流計算有什麼關係呢,這是下面要詳細說明的部分。

三、流計算

流計算是持續、低延遲、事件觸發的數據處理模型。流計算模型是使用實時數據集成工具,將數據實時變化傳輸到流式數據存儲,此時數據的傳輸變成實時化,將長時間累積大量的數據平攤到每個時間點不停地小批量實時傳輸;流計算會將計算邏輯封裝為常駐計算服務,一旦啟動就一直處於等待事件觸發狀態,當有數據流入後會觸發計算迅速得到結果;當流計算得到計算結果後可以立刻將數據輸出,無需等待整體數據的計算結果。

Pick!閒魚億級商品庫中的秒級實時選品

閒魚實時選品系統使用的流計算框架是Blink,Blink是阿里巴巴基於開源流計算框架Flink定製研發的企業級流計算框架,可以認為是Flink的加強版,現在已經開源。Flink是一個高吞吐、低延遲的計算引擎,同時還提供很多高級功能。比如它提供有狀態的計算,支持狀態管理,支持強一致性的數據語義以及支持Event Time,WaterMark對消息亂序的處理等特性,為閒魚實時選品系統的超低延時選品提供了有力支持。

Pick!閒魚億級商品庫中的秒級實時選品

3.1、Blink之State

State是指流計算過程中計算節點的中間計算結果或元數據屬性,比如在aggregation過程中要在state中記錄中間聚合結果,比如Apache Kafka作為數據源時候,我們也要記錄已經讀取記錄的offset,這些State數據在計算過程中會進行持久化(插入或更新)。所以Blink中的State就是與時間相關的,Blink任務的內部數據(計算數據和元數據屬性)的快照。

馬赫系統會在State中保存商品合併之後的全部數據和規則運行結果數據。當商品發生變更後,馬赫系統會將商品變更信息與State保存的商品信息進行合併,並將合併的信息作為入參運行所有規則,最後將規則運行結果與State保存的規則運行結果進行Diff後得到最終有效的運行結果。所以Blink的State特性是馬赫系統依賴的關鍵特性。

3.2、Blink之Window

Blink的Window特性特指流計算系統特有的數據分組方式,Window的創建是數據驅動的,也就是說,窗口是在屬於此窗口的第一個元素到達時創建。當窗口結束時候刪除窗口及狀態數據。Blink的Window主要包括兩種,分別為滾動窗口(Tumble)和滑動窗口(Hop)。

滾動窗口有固定大小,在每個窗口結束時進行一次數據計算,也就是說滾動窗口任務每經過一次固定週期就會進行一次數據計算,例如每分鐘計算一次總量。

Pick!閒魚億級商品庫中的秒級實時選品

滑動窗口與滾動窗口類似,窗口有固定的size,與滾動窗口不同的是滑動窗口可以通過slide參數控制滑動窗口的新建頻率。因此當slide值小於窗口size的值的時候多個滑動窗口會重疊,此時數據會被分配給多個窗口,如下圖所示:

Pick!閒魚億級商品庫中的秒級實時選品

Blink的Window特性在數據計算統計方面有很多使用場景,馬赫系統主要使用窗口計算系統處理數據的實時速度和延時,用來進行數據統計和監控告警。

3.3、Blink之UDX

UDX是Blink中用戶自定義函數,可以在任務中調用以實現一些定製邏輯。Blink的UDX包括三種,分別為:

  • UDF - User-Defined Scalar Function
  • UDF是最簡單的自定義函數,輸入是一行數據的任意字段,輸出是一個字段,可以實現數據比較、數據轉換等操作。
  • UDTF - User-Defined Table-Valued Function
  • UDTF 是表值函數,每個輸入(單column或多column)返回N(N>=0)Row數據,Blink框架提供了少量的UDTF,比如:STRING_SPLIT,JSON_TUPLE和GENERATE_SERIES3個built-in的UDTF。
  • UDAF - User-Defined Aggregate Function
  • UDAF是聚合函數,輸入是多行數據,輸出是一個字段。Blink框架Built-in的UDAF包括MAX,MIN,AVG,SUM,COUNT等,基本滿足了80%常用的集合場景,但仍有一定比例的複雜業務場景,需要定製自己的聚合函數。

馬赫系統中使用了大量的UDX進行邏輯定製,包括消息解析、數據處理等。而馬赫系統最核心的商品數據合併、規則運行和結果Diff等流程就是通過UDAF實現的。

四、秒級選品方案

選品系統在項目立項後也設計有多套技術方案。經過多輪討論後,最終決定對兩套方案實施驗證後決定最終實現方案。

第一套方案是基於PostgreSQL的方案,PostgreSQL可以很便捷的定義Function進行數據合併操作,在PostgreSQL的trigger上定義執行規則邏輯。基於PostgreSQL的技術實現較複雜,但能滿足功能需求。不過性能測試結果顯示PostgreSQL處理小數據量(百萬級)性能較好;當trigger數量多、trigger邏輯複雜或處理億級別數據時,PostgreSQL的性能會有較大下滑,不能滿足秒級選品的性能指標。因此基於PostgreSQL的方案被否決(在閒魚小商品池場景中仍在使用)。

第二套方案是基於Blink流計算方案,通過驗證發現Blink SQL很適合用來表達數據處理邏輯而且Blink性能很好,綜合對比之後最終選擇Blink流計算方案作為實際實施的技術方案。

為了配合使用流計算方案,馬赫系統經過設計和解耦,無縫對接Blink計算引擎。其中數據處理模塊是馬赫系統核心功能模塊,負責接入商品相關各類數據、校驗數據、合併數據、執行規則和處理執行結果並輸出等步驟,所以數據處理模塊的處理速度和延時在很大程度上能代表馬赫系統數據處理速度和延時。接下來我們看下數據處理模塊如何與Blink深度結合將數據處理延遲降到秒級。

Pick!閒魚億級商品庫中的秒級實時選品

數據處理模塊結構如上圖,包含數據接入層、數據合併層、規則運行層和規則運行結果處理層。每層都針對流計算處理模式進行了單獨設計。

4.1、數據接入層

Pick!閒魚億級商品庫中的秒級實時選品

數據接入層是數據處理模塊前置,負責對接多渠道各種類型的業務數據,主要邏輯如下:

  • 數據接入層對接多個渠道多種類型的業務數據;
  • 解析業務數據並做簡單校驗;
  • 統計各渠道業務數據量級並進行監控,包括總量和同比變化量;
  • 通過元數據中心獲取字段級別的Metadata配置。元數據中心是用來保存和管理所有字段的MetaData配置信息組件。Metadata配置代表字段元數據配置,包括字段值類型,值範圍和值格式等基礎信息;
  • 根據Metadata配置進行字段級別數據校驗;
  • 按照馬赫定義的標準數據範式組裝數據。

這樣設計的考慮是因為業務數據是多種多樣的,比如商品信息包括數據庫的商品表記錄、商品變更的MQ消息和算法產生的離線數據,如果直接通過Blink對接這些業務數據源的話,需要創建多個Blink任務來對接不同類型業務數據源,這種處理方式太重,而且數據接入邏輯與Blink緊耦合,不夠靈活。

數據接入層可以很好的解決上述問題,數據接入層可以靈活接入多種業務數據,並且將數據接入與Blink解耦,最終通過同一個Topic發出消息。而Blink任務只要監聽對應的Topic就可以連續不斷的收到業務數據流,觸發接下來的數據處理流程。

4.2、數據合併層

Pick!閒魚億級商品庫中的秒級實時選品

數據合併是數據處理流程的重要步驟,數據合併的主要作用是將商品的最新信息與內存中保存的商品信息合併供後續規則運行使用。數據合併主要邏輯是:

  • 監聽指定消息隊列Topic,獲取業務數據消息;
  • 解析消息,並將消息內容按照字段重新組裝數據,格式為{key:[timestamp, value]},key是字段名稱,value是字段值,timestamp為字段數據產生時間戳;
  • 將組裝後的數據和內存中保存的歷史數據根據timestamp進行字段級別數據合併,合併算法為比較timestamp大小取最新字段值,具體邏輯見下圖。
Pick!閒魚億級商品庫中的秒級實時選品

數據合併有幾個前提:

  1. 內存可以保存存量數據;
  2. 這個是Blink提供的特性,Blink可以將任務運行過程中產生的存量數據保存在內存中,在下一次運行時從內存中取出繼續處理。
  3. 合併後的數據能代表商品的最新狀態;
  4. 這點需要一個巧妙設計:商品信息有很多字段,每個字段的值是數組,不僅要記錄實際值,還要記錄當前值的修改時間戳。在合併商品信息時,按照字段進行合併,合併規則是取時間戳最大的值為準。

舉例來說,內存中保存的商品ID=1的信息是{"desc": [1, "描述1"], "price": [4, 100.5]},數據流中商品ID=1的信息是{"desc": [2, "描述2"], "price": [3, 99.5]},那麼合併結果就是{"desc": [2, "描述2"], "price": [4, 100.5]},每個字段的值都是最新的,代表商品當前最新信息。

當商品信息發生變化後,最新數據由數據接入層流入,通過數據合併層將數據合併到內存,Blink內存中保存的是商品當前最新的全部數據。

4.3、規則運行層

Pick!閒魚億級商品庫中的秒級實時選品

規則運行層是數據處理流程核心模塊,通過規則運算得出商品對各規則命中結果,邏輯如下:

  • 規則運行層接受輸入為經過數據合併後的數據;
  • 通過元數據中心獲取字段級別Metadata配置;
  • 根據字段Metadata配置解析數據;
  • 通過規則中心獲取有效規則列表,規則中心是指創建和管理規則生命週期的組件;
  • 循環規則列表,運行單項規則,將規則命中結果保存在內存;
  • 記錄運行規則拋出異常的數據,並進行監控告警。

這裡的規則指的是運營創建的業務規則,比如商品價格大於50且狀態為在線。規則的輸入是經過數據合併後的商品數據,輸出是true或false,即是否命中規則條件。規則代表的是業務投放場景,馬赫系統的業務價值就是在商品發生變更後儘快判斷是否命中之前未命中的規則或是不命中之前已經命中的規則,並將命中和不命中結果儘快體現到投放場景中。

規則運行需利用Blink強大算力來保證快速執行,馬赫系統當前有將近300條規則,而且還在快速增長。這意味著每個商品發生變更後要在Blink上運行成百上千條規則,閒魚每天有上億商品發生變更,這背後需要的運算量是非常驚人的。

4.4、運行結果處理層

讀者讀到這裡可能會奇怪,明明經過規則運行之後直接把運行結果輸出到投放場景就可以了,不需要運行結果處理層。實際上運行結果處理層是數據處理模塊最重要的部分。

Pick!閒魚億級商品庫中的秒級實時選品

因為在實際場景中,商品的變更在大部分情況只會命中很少一部分規則,而且命中結果也很少會變化。也就是說商品對很多規則的命中結果是沒有意義的,如果將這些命中結果也輸出的話,只會增加操作TPS,對實際結果沒有任何幫助。而篩選出有效的運行結果,這就是運行結果處理層的作用。運行結果處理層邏輯如下:

  • 獲取商品數據的規則運行結果;
  • 按照是否命中規則解析運行結果;
  • 將運行結果與內存中保存的歷史運行結果進行diff,diff作用是排除新老結果中相同的命中子項,邏輯見下圖。
Pick!閒魚億級商品庫中的秒級實時選品

運行結果處理層利用Blink內存保存商品上一次變更後規則運行結果,並將當前變更後規則運行結果與內存中結果進行比較,計算出有效運行結果。舉例來說,商品A上一次變更後規則命中結果為{"rule1":true, "rule2":true, "rule3":false, "rule4":false},當前變更後規則命中結果為{"rule1":true, "rule2":false, "rule3":false, "rule4":true}。因為商品A變更後對rule1和rule3的命中結果沒有變化,所以實際有效的命中結果是{"rule2":false, "rule4":true},通過運行結果處理層處理後輸出的是有效結果的最小集,可以極大減小無效結果輸出,提高數據處理的整體性能和效率。

4.5、難點解析

雖然閒魚實時選品系統在立項之初經過預研和論證,但因為使用很多新技術框架和流計算思路,在開發過程中遇到一些難題,包括設計和功能實現方面的,很多是設計流計算系統的典型問題。我們就其中一個問題與各位讀者探討-規則公式轉換。

4.5.1、規則公式轉換

這個問題的業務場景是:運營同學在馬赫系統頁面上篩選商品字段後保存規則,服務端是已有的老系統,邏輯是根據規則生成一段SQL,SQL的where條件和運營篩選條件相同。SQL有兩方面的作用,一方面是作為離線規則,在離線數據庫中執行SQL篩選符合規則的離線商品數據;另一方面是轉換成在線規則,在Blink任務中對實時商品變更數據執行規則以判斷是否命中。

因為實時規則運行使用的是MVEL表達式引擎,MVEL表達式是類Java語法的,所以問題就是將離線規則的SQL轉換成在線規則的Java表達式,兩者邏輯需一致,並且需兼顧性能和效率。問題的解決方案很明確,解析SQL後將SQL操作符轉換成Java操作符,並將SQL特有語法轉成Java語法,例如A like '%test%'轉成A.contains('test')。

這個問題的難點是如何解析SQL和將解析後的語義轉成Java語句。經過調研之後給出了簡單而優雅的解決方案,主要步驟如下:

  • 使用Druid框架解析SQL語句,轉成一個二叉樹,單獨取出其中的where條件子樹;
  • 通過後序遍歷算法遍歷where條件子樹;
  • 將SQL操作符換成對應的Java操作符;
  • 目前支持且、或、等於、不等於、大於、大於等於、小於、小於等於、like、not like和in等操作。
  • 將SQL語法格式轉成Java語法;
  • 將in語法改成Java的或語法,例如A in ('hello', 'world')轉成(A == 'hello') || (A == 'world')。

實際運行結果如下:

Pick!閒魚億級商品庫中的秒級實時選品

代碼邏輯如下(主要是二叉樹後續遍歷和操作符轉換,不再詳細解釋):

Pick!閒魚億級商品庫中的秒級實時選品

五、結論

馬赫系統上線以來,已經支持近400場活動和投放場景,每天處理近1.4億條消息,峰值TPS達到50000。馬赫系統已經成為閒魚選品投放的重要支撐。

本文主要闡述馬赫系統中數據處理的具體設計方案,說明整體設計的來龍去脈。雖然閒魚實時選品系統針對的是商品選品,但數據處理流計算技術方案的輸入是MQ消息,輸出也是MQ消息,不與具體業務綁定,所以數據處理流計算技術方案不只適用於商品選品,也適合其他類似實時篩選業務場景。希望我們的技術方案和設計思路能給你帶來一些想法和思考,也歡迎和我們留言討論,謝謝。

  • 閒魚實時選品系統:https://mp.weixin.qq.com/s/8ROsZniYD7nIQssC14mn3w
  • Blink:https://github.com/apache/flink/tree/blink
  • PostgreSQL:https://www.postgresql.org/
  • druid:https://github.com/alibaba/druid


分享到:


相關文章: