08.28 面對日近兩百億數據量,美圖大數據平臺如何建成?

面對日近兩百億數據量,美圖大數據平臺如何建成?

如今大數據在各行業的應用越來越廣泛:運營基於數據關注運營效果,產品基於數據分析關注轉化率情況,開發基於數據衡量系統優化效果……

美圖公司有美拍、美圖秀秀、美顏相機等十幾個app,每個app都會基於數據做個性化推薦、搜索、報表分析、反作弊、廣告等,整體對數據的業務需求比較多、應用也比較廣泛。

因此美圖數據技術團隊的業務背景主要體現在:業務線多以及應用比較廣泛。這也是促使我們搭建數據平臺的一個最主要的原因——由業務驅動。

一、美圖的數據應用案例

舉幾個美圖的數據應用案例:

面对日近两百亿数据量,美图大数据平台如何建成?

圖1

如圖1所示,左起第一張是美圖自研的數據可視化平臺DataFace,支持業務方自由拖拽生成可視化報表,便於高效地做數據報表以及後續的分析;中間是美拍APP的首頁“熱門個性化推薦”:基於用戶的行為數據,為用戶推薦可能喜歡、感興趣的視頻列表;右起第一張是基於用戶作弊的數據,根據一定的模型與策略進行反作弊,有效判斷、過濾用戶的作弊行為。除此之外,搜索、a/b實驗、渠道跟蹤、廣告等方面都有廣泛應用。

我們當前的數據量大致每月有5億活躍用戶,這些用戶每天產生接近200億條的行為數據,整體的量級相對來說比較大,集群機器達到千量級,還有PB級的歷史總數據量。

我們的業務線也比較多,而且各業務線都比較廣泛地運用了數據,加上整體的用戶規模也比較大,以上因素促使我們必須構建對應的數據平臺,來驅動這些業務增長,更高效地使用數據。

二、美圖數據平臺整體架構

如下圖2所示是我們數據平臺的整體架構:

面对日近两百亿数据量,美图大数据平台如何建成?

圖2

在數據收集這部分,我們構建了一套採集服務端日誌系統Arachnia,支持各app集成的客戶端SDK,負責收集app客戶端數據;同時也基於DataX實現數據集成(導入、導出);Mor爬蟲平臺支持可配置的爬取公網數據的任務開發。

數據存儲層主要是根據業務特點來選擇不同的存儲方案,目前主要有用到HDFS、MongoDB、Hbase、ES等。在數據計算部分,當前離線計算主要基於Hive&MR、實時流計算基於Storm、Flink以及一個自研的bitmap系統Naix。

數據開發這塊我們構建了一套數據工坊、數據總線分發、任務調度等平臺。

數據可視化與應用部分主要是基於用戶需求構建一系列數據應用平臺,包括:A/B實驗平臺、渠道推廣跟蹤平臺、數據可視化平臺、用戶畫像等等。

圖2右側是各組件都可能依賴的一些基礎服務,包括地理位置、元數據管理、唯一設備標識等。

如圖3所示是基本的數據架構流圖,使用典型的lamda架構。從左端數據源收集開始,Arachnia、AppSDK分別將服務端、客戶端數據上報到代理服務collector,通過解析數據協議,把數據寫到Kafka;然後實時流經過一層數據分發,最終業務消費Kafka數據進行實時計算。

面对日近两百亿数据量,美图大数据平台如何建成?

圖3

離線會由ETL服務負責從Kafka dump數據到HDFS,然後異構數據源(比如MySQL、Hbase等)主要基於DataX以及Sqoop進行數據的導入導出,接著通過Hive、Kylin、Spark等計算把數據寫入到各類的存儲層,最後通過統一的對外API對接業務系統和我們自己的可視化平臺等。

三、數據平臺的階段性發展

企業級數據平臺建設主要分三個階段:

第一階段:

剛開始是基本使用免費的第三方平臺,這個階段的特點是能快速集成並看到app的一些統計指標,但是缺點也很明顯,沒有原始數據、除了第三方提供的基本指標之外,其他分析、推薦等都無法實現。所以有從0到1的過程,讓我們自己有數據可以用。

第二階段:

在有數據可用後,業務線、需求量爆發,就需要提高開發效率,讓更多的人參與數據開發、使用到數據,不是僅侷限於數據研發人員使用,所以下一步就是把數據、計算存儲能力開放給各個業務線,而非握在自己手上。

第三階段:

在數據開放了以後,業務方會關注數據任務能否跑得更快,能否秒出,能否更實時;另外一方面,為了滿足業務需求,集群的規模越來越大,那麼在滿足業務的同時,也開始考慮如何實現資源更節省。

美圖現在處於第二與第三階段的過渡期,在不斷完善數據開放的同時,也在逐步提升查詢分析效率,並開始考慮如何優化成本。接下來我們重點介紹從0到1和數據開放這兩個階段我們平臺的實踐以及優化思路。

1、從0到1

從0到1是要解決從數據採集到數據最終可以使用。

如圖4所示是數據收集的演進過程,從剛開始使用類似umeng、flurry的免費第三方平臺,到後面快速使用rsync同步日誌到一臺服務器上存儲、計算,再到後面快速開發了一個簡單的Python腳本支持業務服務器上報日誌,最終我們開發了服務端日誌採集系統Arachnia以及客戶端AppSDK。

面对日近两百亿数据量,美图大数据平台如何建成?

圖4

數據採集是數據的源頭,在整個數據鏈路中是相對重要的環節,需要更多關注:數據是否完整、數據是否支持實時上報、數據埋點是否規範準確、以及維護管理成本。因此我們的日誌採集系統需要滿足以下需求:

  • 能集成管理維護,包括Agent能自動化部署安裝升級卸載、配置熱更、延遲方面的監控;

  • 在可靠性方面至少需要保證at least once;

  • 美圖現在有多IDC的情況,需要能支持多個IDC數據採集彙總到數據中心;

  • 在資源消耗方面儘量小,儘量做到不影響業務。


基於以上需求我們沒有使用flume、scribe、fluentd,而是選擇自己開發一套採集系統Arachnia。

如圖5是Arachnia的簡易架構圖,它通過系統大腦進行集中式管理。puppet模塊主要作為單個IDC內統一彙總Agent的metrics,中轉轉發的metrics或者配置熱更命令。採集器Agent主要是運維平臺負責安裝、啟動後從brain拉取到配置,並開始採集上報數據到collector。

面对日近两百亿数据量,美图大数据平台如何建成?

圖5

接著看Arachnia的實踐優化:首先是at least once的可靠性保證。不少的系統都是採用把上報失敗的數據通過WAL的方式記錄下來,重試再上報,以免上報失敗丟失。我們的實踐是去掉WAL,增加了coordinator來統一的分發管理tx狀態,如圖6:

面对日近两百亿数据量,美图大数据平台如何建成?

圖6

開始採集前會從coordinator發出txid,source接收到信號後開始採集,並交由sink發送數據,發送後ack tx,告訴coordinator已經commit。coordinator會進行校驗確認,然後再發送commit的信號給source、sink更新狀態,最終tx完source會更新採集進度到持久層(默認是本地file)。

該方式如果在前面3步有問題,則數據沒有發送成功,不會重複執行;如果後面4個步驟失敗,則數據會重複,該tx會被重放。

基於上文的at least once可靠性保證,有些業務方是需要唯一性的,我們這邊支持為每條日誌生成唯一ID標識。另外一個數據採集系統的主要實踐是:唯一定位一個文件以及給每條日誌做唯一的MsgID,方便業務方可以基於MsgID在發生日誌重複時能在後面做清洗。

我們一開始是使用filename,後面發現filename很多業務方都會變更,所以改為inode,但inode linux會回收重複利用,最後是以inode&文件頭部內容做hash來作為fileID。而MsgID是通過agentID & fileID & offset來唯一確認。

數據上報之後由collector負責解析協議推送數據到Kafka,那麼Kafka如何落地到HDFS呢? 首先看美圖的訴求:

  • 支持分佈式處理;

  • 涉及到較多業務線因此有多種數據格式,所以需要支持多種數據格式的序列化,包括json、avro、特殊分隔符等;

  • 支持因為機器故障、服務問題等導致的數據落地失敗重跑,而且需要有比較快速的重跑能力,因為一旦這塊故障,會影響到後續各個業務線的數據使用;

  • 支持可配置的HDFS分區策略,能支持各個業務線相對靈活的、不一樣的分區配置;

  • 支持一些特殊的業務邏輯處理,包括:數據校驗、過期過濾、測試數據過濾、注入等;


基於上述訴求痛點,美圖從Kafka落地到HDFS的數據服務實現方式如圖7所示:

面对日近两百亿数据量,美图大数据平台如何建成?

圖7

基於Kafka和MR的特點,針對每個kafka topic的partition,組裝mapper的inputsplit,然後起一個mapper進程處理消費這個批次的kafka數據,經過數據解析、業務邏輯處理、校驗過濾、最終根據分區規則落地寫到目標HDFS文件。落地成功後會把這次處理的meta信息(包括topic、partition、開始的offset、結束的offset)存儲到MySQL。下次再處理的時候,會從上次處理的結束的offset開始讀取消息,開始新一批的數據消費落地。

實現了基本功能後難免會遇到一些問題,比如不同的業務topic的數據量級是不一樣的,這樣會導致一次任務需要等待partition數據量最多以及處理時間最長的mapper結束,才能結束整個任務。

那我們怎麼解決這個問題呢?系統設計中有個不成文原則是:分久必合、合久必分,針對數據傾斜的問題我們採用了類似的思路。

首先對數據量級較小partition合併到一個inputsplit,達到一個mapper可以處理多個業務的partition數據,最終落地寫多份文件,如圖8所示:

面对日近两百亿数据量,美图大数据平台如何建成?

圖8

另外對數據量級較大的partition支持分段拆分,平分到多個mapper處理同一個partition,這樣就實現了更均衡的mapper處理,能更好地應對業務量級的突增,如圖9所示:

面对日近两百亿数据量,美图大数据平台如何建成?

圖9

除了數據傾斜的問題,還出現各種原因導致數據dump到HDFS失敗的情況,比如因為Kafka磁盤問題、Hadoop集群節點宕機、網絡故障、外部訪問權限等導致該ETL程序出現異常,最終可能導致因為未close HDFS文件導致文件損壞等,需要重跑數據。那我們的數據時間分區基本都是以天為單位,用原來的方式可能會導致一個天粒度的文件損壞,解析無法讀取。

我們採用了分兩階段處理的方式:mapper1先把數據寫到一個臨時目錄,mapper2把HDFS的臨時目錄的數據append到目標文件。這樣當mapper1失敗的時候可以直接重跑這個批次,而不用重跑整天的數據;當mapper2失敗的時候能直接從臨時目錄merge數據替換最終文件,減少了重新ETL天粒度的過程,如圖10所示:

面对日近两百亿数据量,美图大数据平台如何建成?

圖10

經由數據的實時分發訂閱寫入到Kafka1的數據基本是每個業務的全量數據,但是針對需求方大部分業務都只關注某個事件、某小類別的數據,而不是任何業務都消費全量數據做處理,所以我們增加了一個實時分發Databus來解決這個問題,如圖11所示:

面对日近两百亿数据量,美图大数据平台如何建成?

圖11

Databus支持業務方自定義分發rules往下游的Kafka集群寫數據,方便業務方訂閱處理自己想要的數據,並且支持更小粒度的數據重複利用。

圖12可以看出Databus的實現方式,它的主體基於Storm實現了databus topology:

面对日近两百亿数据量,美图大数据平台如何建成?

圖12

Databus有兩個spout,一個支持拉取全量以及新增的rules,然後更新到下游的分發bolt更新緩存規則,另外一個是從Kafka消費的spout。而distributionbolt主要是負責解析數據、規則match,以及把數據往下游的Kafka集群發送。

2、數據開放與平臺穩定性

有了原始數據並且能做離線、實時的數據開發以後,隨之而來的是數據開發需求的井噴,數據研發團隊應接不暇。所以我們通過數據平臺的方式開放數據計算、存儲能力,賦予業務方數據開發的能力。

實現元數據管理、任務調度、數據集成、DAG任務編排、可視化等不一一贅述,接下來我們主要介紹數據開放後,美圖對穩定性方面的實踐心得。

數據開放和系統穩定性是相愛相殺的關係:一方面,開放了之後不再是由有數據基礎的研發人員來做,經常會遇到提交非法、高資源消耗等問題的數據任務,給底層的計算、存儲集群的穩定性造成了比較大的困擾;另外一方面,其實也是因為數據開放,才不斷推進我們必須提高系統穩定性。

針對不少的高資源、非法的任務,我們首先考慮能否在HiveSQL層面做一些校驗、限制。如圖13所示是HiveSQL的整個解析編譯為可執行的MR的過程:

面对日近两百亿数据量,美图大数据平台如何建成?

圖13

首先基於Antlr做語法的解析,生成AST,接著做語義解析,基於AST會生成JAVA對象QueryBlock。基於QueryBlock生成邏輯計劃後做邏輯優化,最後生成物理計劃,進行物理優化後,最終轉換為一個可執行的MR任務。

我們主要在語義解析階段生成QueryBlock後,拿到它做了不少的語句校驗,包括:非法操作、查詢條件限制、高資源消耗校驗判斷等。

第二個在穩定性方面的實踐,主要是對集群的優化,包括:

  • 我們完整地對Hive、Hadoop集群做了一次升級。主要是因為之前在低版本有fix一些問題以及合併一些社區的patch,在後面新版本都有修復;另外一個原因是新版本的特性以及性能方面的優化。我們把Hive從0.13版本升級到2.1版本,Hadoop從2.4升級到2.7;

  • 對Hive做了HA的部署優化。我們把HiveServer和MetaStoreServer拆分開來分別部署了多個節點,避免合併在一個服務部署運行相互影響;

  • 之前執行引擎基本都是On MapReduce的,我們也在做Hive On Spark的遷移,逐步把線上任務從Hive On MR切換到Hive On Spark;

  • 拉一個內部分支對平時遇到的一些問題做bugfix或合併社區patch的特性;

在平臺穩定性方面的實踐最後一部分是提高權限、安全性,防止對集群、數據的非法訪問、攻擊等,如圖14所示:

面对日近两百亿数据量,美图大数据平台如何建成?

圖 14

提高權限主要分兩塊:API訪問與集群。

  • API Server:正如上文提到的,我們有OneDataAPI,提供給各個業務系統訪問數據的統一API。這方面主要是額外實現了一個統一認證CA服務,業務系統必須接入CA拿到token後來訪問OneDataAPI,OneDataAPI在CA驗證過後,合法的才允許真正訪問數據,從而防止業務系統可以任意訪問所有數據指標。

  • 集群:目前主要是基於Apache Ranger來統一各類集群,包括Kafka、Hbase、Hadoop等做集群的授權管理和維護。


以上就是美圖在搭建完數據平臺並開放給各個業務線使用後,對平臺穩定性做的一些實踐和優化。

三、總結

接下來我們對數據平臺建設過程做一個簡單的總結:

  • 首先在搭建數據平臺之前,一定要先了解業務,看業務的整體體量是否較大、業務線是否較廣、需求量是否多到嚴重影響我們的生產力。如果都是肯定答案,那可以考慮儘快搭建數據平臺,以便更高效、快速提高數據的開發應用效率。如果本身的業務量級、需求不多,不一定非得套大數據或者搭建多麼完善的數據平臺,以快速滿足支撐業務優先。

  • 在平臺建設過程中,需要重點關注數據質量、平臺的穩定性,比如關注數據源採集的完整性、時效性、設備的唯一標識,多在平臺的穩定性方面做優化和實踐,為業務方提供一個穩定可靠的平臺。

  • 在提高分析決策效率以及規模逐漸擴大後需要對成本、資源做一些優化和思考。



分享到:


相關文章: