「案例」攜程網-機票數據倉庫建設解決方案

一、前言

隨著大數據技術的飛速發展,海量數據存儲和計算的解決方案層出不窮,生產環境和大數據環境的交互日益密切。數據倉庫作為海量數據落地和扭轉的重要載體,承擔著數據從生產環境到大數據環境、經由大數據環境計算處理回饋生產應用或支持決策的重要角色。

數據倉庫的主題覆蓋度、性能、易用性、可擴展性及數據質量都是衡量數據倉庫解決方案好壞的重要指標。攜程機票部門數據倉庫也在不斷摸索向著這些目標砥礪前行。

二、攜程機票數據倉庫技術棧

攜程機票部門的數據倉庫建設主要基於公司公共部門的大數據基礎環境及數據調度平臺,輔以部分自運維的開源存儲引擎和基於開源組件二次開發的數據同步工具和運維工具。

2.1 數倉技術演進歷史

機票部門的數據倉庫源於 2008 年,當時生產環境數據落地主要使用 SQLServer,數據倉庫處理的目標數據體量不大,因此選擇的 SQLServer、Informaticas、Kettle 這樣的數據倉庫方案,數據模型設計及報表定製使用 SAP 的商用平臺 BO。

隨著機票業務系統的日益複雜,特別是生產環境引入消息中間件 Kafka 存儲日誌數據後,這套方案不可擴展性的缺點日趨明顯,SQLServer 的存儲和計算能力很大程度上限制了數倉數據的主題覆蓋度及性能。

在 2014 年,公司公共部門 hadoop 集群部署上線,並且引入了 zeus 調度平臺及 DataX 同步工具,各個 BU 的數據倉庫開始逐步轉為基於 Hive 建設。

隨著生產業務對實時監控、流量回放的需求增強,2016 年機票部門部署了 ElasticSearch,用以實時落地從 Kafka 同步的各個主流程服務日誌,並通過統一的交易標識 (transactionID) 串聯用戶的一次完整的搜索、下單等行為,用於生產排障和流量回放。基於 Hive 的搜索性能一直被廣泛詬病,特別是針對 adhoc 查詢,機票部門在 2016 年調研並部署了 Facebook 開源的基於內存和 Pipeline 的查詢引擎 Presto,在沒有享受到 local 數據獲取的前提下,查詢性能較原生的 Hive 引擎或者 Spark 引擎都有很大的提升。

在 2018 年,為了支持數倉數據的可視化運營平臺,我們先後引入了 ClickHouse 和 CrateDB 作為後臺的存儲和查詢引擎,特別是引入 CrateDB 以後,億級體量的表四個維度的聚合耗時 P90 下降到了 4 秒。

實時數據處理技術也經過了 Esper,Storm,Spark Streaming 和 Flink 的迭代,並慢慢收斂到 Flink。總體的技術演進歷史如圖 1 所示。

「案例」攜程網-機票數據倉庫建設解決方案

圖 1 數倉技術演進歷史


2.2 當前技術棧

生產環境的數據可以大致分成三類:

1)業務數據,主要存儲在 MySQL 和 SQLServer,在這些關係型數據庫裡面有數以萬計的表承接著各種生產服務的業務數據寫入;

2)基礎數據,也是存儲在 MySQL 和 SQLServer 中,生產應用時一般會建立一層中心化緩存(如 Redis)或者本地緩存;

3)日誌數據,這類數據的特點是”append only”,對已經生成的數據不會有更新的操作,考慮到這類數據的高吞吐量,生產環境一般會用消息隊列 Kafka 暫存;

數據倉庫在實施數據同步時,會根據需求在實時、近實時以及 T+1 天等不同的頻率執行數據同步,並且在大數據環境會用不同的載體承接不同頻率同步過來的數據。在攜程機票,實時同步的目標載體是 ElasticSearch、CrateDB 或者 HBase,近實時(一般 T+1 小時)或者 T+1 天的目標載體是 Hive。

從生產的數據載體來講,主要包括 DB 和消息隊列,他們的數據同步方案主要是:

1)生產 DB 到 Hive 的同步使用 taobao 開源的 DataX,DataX 由網站運營中心 DP 團隊做了很多擴展開發,目前支持了多種數據源之間的數據同步。實時同步的場景主要在 MySQL,使用 DBA 部門使用 Canal 解析並寫入至消息隊列的 bin log。

2)從 Kafka 到 Hive 同步使用 Camus,但是由於 Camus 的性能問題及消費記錄和消費過期較難監控的問題,我們基於 spark-sql-kafka 開發了 hamal,用於新建的 Kafka 到 Hive 的同步;Kafka 實時同步的載體主要是 ElasticSearch 或者 CrateDB,主要通過 Flink 實施。

生產數據被同步數據倉庫後,會在數倉內完成數據清洗、信息整合、聚合計算等數據扭轉流程,最終數據出倉導入到其它載體,這一系列的流程調度由公司 DP 團隊運維的調度平臺 Zeus 完成。

「案例」攜程網-機票數據倉庫建設解決方案

圖 2 攜程機票數倉技術棧


2.3 實時 VS 離線

當前機票部門的數據倉庫建設主要基於離線數據,一方面跟 OTA 銷售產品不屬於快消品相關,實時當前並不是剛需;另一方面實時處理場景下需要對計算資源、存儲資源穩定性有更高的要求,保持數據一致性的代價很大。結合兩方面,如果業務對實時需求不高就鋪開做實時數倉,ROI 很難達標。

當然,隨著攜程業務體量的增長,數據使用方對數據實時性要求日益增高,我們團隊在 2020 年也會探索實時數據倉庫的實施方案,並在一兩個重要的數據主題域上先行試點。

三、數據倉庫建設時涉及的共性問題

從團隊職能上來講,數據倉庫團隊需要負責從生產環境同步數據,在內部完成各層級的扭轉計算,參與所有數倉流程及報表的運維,並基於數倉公共數據層和應用數據層數據開發相關應用。

3.1 數據同步

為了保持數倉數據主題覆蓋足夠全面,我們部門幾乎將所有生產表和 Kafka topics 都同步到了 Hive。以下會對同步最常見的兩種場景 DB->Hive 和 Kafka->Hive 相關的實踐做介紹。

3.1.1 DB 同步到 Hive

特別對生產表到 Hive 的同步,人工配置腳本的方式顯然不能處理數以萬計的表,因此需要一個自動化的同步方案。自動同步方案需要不僅僅要解決自動創建表腳本、創建對應的同步腳本問題,還需要在當表結構發生變更的時候,能夠自動地感知表結構的變化,並且修改表結構和對應的同步腳本。

DB 到 Hive 同步需要依賴兩個數據源,1)Schema 表的元數據信息,簡單地包括各個字段信息、字段類型及主鍵定義;2)統計數據,它主要描述的是這個表在數據產生後有沒有 UPDATE 和 DELETE,這個決定著後續表的分區方式。

對業務型數據,一條數據生成後可能會有 Update,因為在數倉裡絕大部分場景需要用到數據的最新狀態,所以我們會用一個分區存放所有歷史數據的最新狀態,這類表我們稱之為歷史切片表。對日誌型數據,生產上數據產生後就不會有任何修改,我們會選擇使用增量分區,每個分區會放當天的增量數據。對基礎數據,整個表的數據增加、更新的頻率都非常低,在 ods 層我們會每天全量同步一份到最新數據分區,並且會建立一個無分區的下游維表,將數據狀態為有效的數據放到這張下游無分區維表中方便流程使用。

有了上述這兩個數據源以後,我們會根據 DBA Schema 服務返回的元數據信息生成 Hive 表的腳本,並調度執行生成新的 Hive 表,再依據統計數據決定表的分區方式,進而生成對應新建表的同步腳本。當表創建或者表結構發生變更的時候,通過 Schema 服務兩天輸出的比對,我們會發現表結構的變更並映射到對應 Hive 表結構變更,同時可以改變對應的同步腳本。還有一種思路是可以通過 DB 發佈系統的日誌,獲知每天 DB 創建、表創建以及表結構變化的增量。

「案例」攜程網-機票數據倉庫建設解決方案

圖 3 生產 DB 到 Hive 的同步


有一個坑點就是生產物理刪除,如果出現了物理刪除並且需要在 Hive 表裡將刪除數據識別並標記出來,當前可能需要通過全量同步的方法(考慮到從生產環境取數的代價,全量同步業務主鍵字段即可)解決,特別對 SQLServer。因此可以跟生產的開發協商儘量使用邏輯刪除,這樣數倉對刪除數據的感知代價會小很多。

3.1.2 Kafka 同步到 Hive

當前我們非實時同步主要在使用 Linkedin 很久以前的一個工具 Camus,當然 DP 團隊經過優化和企業本地化二次開發。但從使用感受來看,Camus 會有如下可能不足的地方:

1)基於 mapreduce,mapreduce 在 yarn 集群上搶佔資源的能力較弱,在資源競爭高峰會有同步變慢的情況發生;

2)消費記錄存儲在 HDFS 各個文件裡,這樣對消費記錄的獲取和針對消費過期的監控都很不方便;

3)Kafka Topic 和 Hive 表的血緣關係獲取不方便;

因此,我們基於 spark-sql-kafka 開發 hamal,旨在解決如上痛點並且讓配置更加的簡潔。實現的過程大概包括,spark-sql-kafka 會根據輸入的任務從 Kafka 各個 Partition 消費出 payload 數據,對每條 payload 執行解編碼、解壓、magic code 等操作,此時會將 payload 數據轉化成 json 字符串,這個 json 字符串可以直接作為一個字段寫入到 Hive 表裡,也可以根據事先配置提取出對應的節點和值作為列和列值寫入到 Hive 中,甚至可以通過 Json 的 Schema 推斷出 Hive 表結構,並將 Json 各節點對應寫到 Hive 表的各列中。

「案例」攜程網-機票數據倉庫建設解決方案

圖 4 轉化為 json 字符串 RDD 代碼示例


如果選擇推斷的模式,實現的時候可以使用 sampling 的方式,類似 spark jsonRDD 第二個參數,比如說 0.001,Hamal 可以直接指定採樣數據條數,從 Kafka topic 中拉取出來,通過 jsonRDD 推斷出 StructType,並映射成 Hive 建表語句。對於建好的表,通過表的字段匹配獲取數據,最終寫入 Hive 表,最後會提交消費記錄到一張 Hive 的 ConsumerRecord 表裡面。這樣其實基於這個表,我們既可以獲取 Kafka topic 和 Hive 表的血緣,也可以方便地監控每次同步的數據量。

「案例」攜程網-機票數據倉庫建設解決方案

圖 5 Kafka 同步至 Hive Hamal 設計


3.2 數倉分層

分層設計主要參考公司推行的數據規範,將數據倉庫的流程分成了生產鏡像層 (ods)、中間層 (edw)、公共數據層 (cdm) 及應用數據層 (adm)。在中間層對 ods 表做異常數據剔除、NULL 值處理、枚舉值統一等數據清理和綁定維表信息工作,在公共數據層對中間層表進行進一步的整合,豐富表主題的維度和度量,一般以寬表的形式呈現,用以後續的 adhoc 取數、報表。

根據機票本身的業務特點,我們將數據劃分成流量、產量、收益、生產 KPI、業務考核等幾大主題域,對數據表的業務分類和有效管理有重要意義。

「案例」攜程網-機票數據倉庫建設解決方案

圖 6 數倉分層設計


3.3 數據解析

數據在同步至數據 ods 層後,產品經常會提的一個需求是將 ods 層某個含報文字段的表按照字段設計展開,如果要支持此類需求,數據開發就需要了解生產上這個表各個字段含義及報文字段的契約定義,而這些對應表的寫入開發非常熟悉。因此,為了提高整體的工作效率,我們開發了一套數據解析框架,對業務開發封裝了大數據組件的 API 調用及相關參數調整,讓業務開發更高效地完成熟悉的單條數據解析開發。

「案例」攜程網-機票數據倉庫建設解決方案

圖 7 數據解析框架


3.4 數倉運維工具

數據倉庫擁有所有生產表的鏡像表、數以萬計的生產數據同步流程、數據扭轉流程以及後續報表,對如此規模的數倉實體的管理和運維需要一個不斷迭代的系統支持,從而可以大幅度提高數據工程師的效率。

我們根據數倉建設中遇到的一些費力度較高且需要重複做的操作,開發了一套運維工具集合,目前還在持續迭代中。運維工具集功能主要包括數據實體通用搜索,報表收件人批量變更,維表導入,Oncall 錄入,腳本模板生成,序列化與反序列化等等。工具開發難度不大,但對提高效率的幫助很大。

四、數據質量體系

對龐大的數據倉庫實體建設完善的數據質量監控體系,光靠人工 one by one 設置檢驗規則是不夠的,需要對幾乎所有的實體建立相應的監控,並且不能給大數據集群帶來很多額外的計算代價。當這樣的覆蓋面很廣的監控完善後,配合著元數據信息,就有可能在故障的 Root Cause 點第一時間發現故障,並可以清晰地知曉故障的影響範圍以及故障恢復的流程優先級調度。

因此,建立完善的數據質量體系需要完善元數據管理,建立輕量的覆蓋面廣的質量監控,並且對特別重要的流程,需要增加額外的業務相關校驗。

4.1 元數據管理

在生產環境和大數據環境存在多種實體,這些實體包括應用、各類表 (如 SQLServer、MySQL、MongoDB 的表等)、消息隊列 topic、ElasticSearch 的 index、Hive 的表等等,這些實體相互關聯,共同支撐著線上的系統,線下的分析。對這些信息的治理,實體的元數據管理至關重要。

在數倉體系中,元數據主要包含基礎信息、血緣關係以及標籤。基礎信息跟數據表相關,具體包括表的字段、存儲、分區類型等;血緣會涉及到各類的實體,表、流程、報表、郵件推送等,這些實體之間存在著上下游調用與被調用關係,成體系地管理好這些實體之間的關係,可以清晰地瞭解到數倉邊界,使得對故障的 Root Cause 追溯以及該 Root Cause 帶來的影響面評估非常便捷。標籤是對實體的分類描述,如層級是屬於哪一層,安全是否有涉密,重要等級,是否有非常重要的流程在上面,業務標籤是屬於訂單、前端還是訂後。

4.2 數據質量相關因素

數據質量的問題其實一般可以在流程執行的日誌中看出端倪,因為人工排查故障的時候,除了常規通過 SQL 查詢驗證表的增量、業務主鍵、某些字段值是否正常,另外一個有效手段就是分析運行日誌。

從運行日誌中可以獲取以下信息,流程的開始時間、截止時間流程執行時間、完成狀態、每天增量的字節數、增量條數,引擎執行的參數,在用 Spark 或者 MapReduce 執行時消耗資源的情況等等一系列特徵。通過對各類計算引擎產生日誌的分析,可以獲得各類引擎下記錄日誌數據的 pattern,從而提取出相關的特徵信息。遇到特殊的流程或者引擎,可以借用其他手段補齊特徵數據,如用 SQL,用 Hadoop 的命令。

「案例」攜程網-機票數據倉庫建設解決方案

圖 8 數據質量相關特徵


這是我們簡單的一個日誌輸出,第一張是 Spark 的執行日誌,下面一張是 MapReduce 的執行日誌。

「案例」攜程網-機票數據倉庫建設解決方案

「案例」攜程網-機票數據倉庫建設解決方案

圖 9 MR 和 Spark 引擎執行日誌示例


有了數據質量特徵提取的邏輯,實時流程異常發現可以如下實施:我們可以將質量特徵數據計算分成兩塊,一塊是實時的針對單個流程日誌的解析出相關特徵,一塊是離線的基於歷史特徵數據的統計。我們從消息隊列中消費實時獲取執行完成的流程 id 和 actionid,通過運維團隊提供的詳情日誌查詢接口獲取完整日誌,通過特徵解析邏輯,解析出實時的流程質量相關特徵,匹配歷史數據,應用規則。當滿足異常規則,可以通過元數據信息中的血緣判斷影響的範圍,推送告警信息。

「案例」攜程網-機票數據倉庫建設解決方案

圖 10 實時流程異常監控實施方案


五、應用案例

攜程作為平臺方,對機票價格沒有定價權,價格由產品提供方來提供。在每年航班計劃換季的時候,產品提供方會有一小部分概率將價格錄入錯。錯誤的運價,特別是很低的錯誤運價會讓航司或供應商蒙受超大的損失。本著公平交易的原則,攜程作為銷售平臺,做了機票價格監控系統。上線至今,發現了數十起價格異常事件。

在生產的消息隊列中,我們落地了用戶查詢返回的所有航班組合和價格信息,數據倉庫完成近實時同步,將數據解析處理成異常價格相關特徵集,每天的增量在百億級別。我們從 Kafka 實時消費兩類日誌數據,一類是查詢日誌,一類是下單日誌,建立匹配,建立規則集發現可疑的低價交易標識,並且進一步監控跟交易標識是否進入下單流程。當某個疑似異常特徵帶來的訂單超過一定閾值時,系統會對這疑似異常特徵對應的查詢進行自動禁售。

「案例」攜程網-機票數據倉庫建設解決方案

圖 11 價格監控系統


六、小結

一套完整的數據倉庫實施方案應該包括但不侷限於上面介紹的數據同步方案、數據存儲方案、數據規範、元數據建設、數據質量體系、運維工具等,每個實施團隊應該根據面臨的實際情況選擇針對每個點的具體技術方案。

攜程機票數據倉庫團隊也正朝著建設全面、規範、易用、高效、精準的數倉路上探索前行,當前在數據同步、數倉數據扭轉以及出倉應用方面的實踐方案還在隨著需求的變化而迭代。接下來,我們團隊會著重在數據倉庫規範徹底落地以及實時數倉實施這些方向上努力。



分享到:


相關文章: