最快開源 OLAP 引擎!ClickHouse 在頭條的技術演進

ClickHouse 是由號稱“俄羅斯 Google”的 Yandex 公司開源的面向 OLAP 的分佈式列式數據庫,能夠使用 SQL 查詢生成實時數據報告。

本文整理自字節跳動高級研發工程師陳星在 QCon 全球軟件開發大會(北京站)2019 上的演講,他介紹了 ClickHouse 的關鍵技術點、在字節跳動的應用場景以及主要的技術改進。

ClickHouse 簡介

ClickHouse 是由號稱“俄羅斯 Google”的 Yandex 開發而來,在 2016 年開源,在計算引擎裡算是一個後起之秀,在內存數據庫領域號稱是最快的。大家從網上也能夠看到,它有幾倍於 GreenPlum 等引擎的性能優勢。

如果大家研究過它的源碼,會發現其實它採用的技術並不新。ClickHouse 是一個列導向數據庫,是原生的向量化執行引擎。它在大數據領域沒有走 Hadoop 生態,而是採用 Local attached storage 作為存儲,這樣整個 IO 可能就沒有 Hadoop 那一套的侷限。它的系統在生產環境中可以應用到比較大的規模,因為它的線性擴展能力和可靠性保障能夠原生支持 shard + replication 這種解決方案。它還提供了一些 SQL 直接接口,有比較豐富的原生 client。另外就是它比較快。

大家選擇 ClickHouse 的首要原因是它比較快,但其實它的技術沒有什麼新的地方,為什麼會快?我認為主要有三個方面的因素:

  1. 它的數據剪枝能力比較強,分區剪枝在執行層,而存儲格式用局部數據表示,就可以更細粒度地做一些數據的剪枝。它的引擎在實際使用中應用了一種現在比較流行的 LSM 方式。
  2. 它對整個資源的垂直整合能力做得比較好,併發 MPP+ SMP 這種執行方式可以很充分地利用機器的集成資源。它的實現又做了很多性能相關的優化,它的一個簡單的匯聚操作有很多不同的版本,會根據不同 Key 的組合方式有不同的實現。對於高級的計算指令,數據解壓時,它也有少量使用。
  3. 我當時選擇它的一個原因,ClickHouse 是一套完全由 C++ 模板 Code 寫出來的實現,代碼還是比較優雅的。

字節跳動如何使用 ClickHouse

頭條做技術選型的時候為什麼會選用 ClickHouse?這可能跟我們的應用場景有關,下面簡單介紹一下 ClickHouse 在頭條的使用場景。

頭條內部第一個使用 ClickHouse 的是用戶行為分析系統。該系統在使用 ClickHouse 之前,engine 層已經有兩個迭代。他們嘗試過 Spark 全內存方案還有一些其他的方案,都存在很多問題。主要因為產品需要比較強的交互能力,頁面拖拽的方式能夠給分析師展示不同的指標,查詢模式比較多變,並且有一些查詢的 DSL 描述,也不好用現成的 SQL 去表示,這就需要 engine 有比較好的定製能力。

行為分析系統的表可以打成一個大的寬表形式,join 的形式相對少一點。系統的數據量比較大,因為產品要支持頭條所有 APP 的用戶行為分析,包含頭條全量和抖音全量數據,用戶的上報日誌分析,面臨不少技術挑戰。大家做了一些調研之後,在用 ClickHouse 做一些簡單的 POC 工作,我就拿著 ClickHouse 按需求開始定製了。

綜合來看,從 ClickHouse 的性能、功能和產品質量來說,效果還不錯,因為開發 ClickHouse 的公司使用的場景實際上跟頭條用戶分析是比較類似的,因此有一定的借鑑意義。

目前頭條 ClickHouse 集群的規模大概有幾千個節點,最大的集群規模可能有 1200 個節點,這是一個單集群的最大集群節點數。數據總量大概是幾十個 PB,日增數據 100TB,落地到 ClickHouse,日增數據總量大概是它的 3 倍,原始數據也就 300T 左右,大多數查詢的響應時間是在幾秒鐘。從交互式的用戶體驗來說,一般希望把所有的響應控制在 30 秒之內返回,ClickHouse 基本上能夠滿足大部分要求。覆蓋的用戶場景包括產品分析師做精細化運營,開發人員定位問題,也有少量的廣告類客戶。

圖 1 是一個 API 的框架圖,相當於一個統一的指標出口,也提供服務。圍繞著 ClickHouse 集群,它可以支撐不同的數據源,包括離線的數據、實時的消息中間件數據,也有些業務的數據,還有少量高級用戶會直接從 Flink 上消費一些 Databus 數據,然後批量寫入,之後在它外圍提供一個數據 ETL 的 Service,定期把數據遷移到 ClickHouse local storage 上,之後他們在這之上架了一個用戶使用分析系統,也有自研的 BI 系統做一些多維分析和數據可視化的工作,也提供 SQL 的網關,做一些統一指標出口之類的工作,上面的用戶可能是多樣的。

綜合來說,我們希望在頭條內部把 ClickHouse 打造成為支持數據中臺的查詢引擎,滿足交互式行為的需求分析,能夠支持多種數據源,整個數據鏈路對業務做到透明。在工作過程中,我們也碰到了很多的問題。

最快開源 OLAP 引擎!ClickHouse 在頭條的技術演進

圖 1 Bytedance 如何使用 ClickHouse

問題與解決方案

接下來我會詳細介紹我們在使用 ClickHouse 的過程中碰到過什麼問題,希望對大家有一些借鑑意義。

數據源到 ClickHouse 服務化

我們在做 ClickHouse 服務化的過程中,第一步就是如何把數據落到 ClickHouse 集群中。原生的 ClickHouse 沒有 HDFS 訪問能力,我們同時還需要保證對用戶透明,就可能存在幾個問題:

第一,怎麼訪問離線數據?

第二,ClickHouse 沒有事務支持,如果在數據導入過程中發生了 Fail,如何做 Fail over?

第三,ClickHouse 數據就緒速度。我們整個數據就緒的壓力很大,上游就緒的時間比較晚,每天早上就會有一些分析師在 ClickHouse 上看指標,整個數據落到 ClickHouse 留給我們的空擋可能不是太長。

我們針對這些問題做了一些改動。第一,從 HAWQ 上移植過來 HDFS client,讓 ClickHouse 能夠直接訪問數據,我們的 ETL 服務實際上維護了一套外部事務的邏輯,然後做數據一致性的保證;為了保證就緒時間,我們充分利用各個節點的計算能力和數據的分佈式能力,實際上最終都會在外圍服務把數據作一些 Repartition,直接寫入各個節點本地表。另外,我們還有一些國際化的場景,像 TikTok、 Musical.ly 等,數據就緒和分析師分析的時間是有重疊的,數據寫和查詢交互的影響還是有一些。我們最近也在嘗試把數據構建和查詢分離出來,並開發相應的 Feature,但是還沒有上線,從 Demo 來看,這條路是行得通的。

最快開源 OLAP 引擎!ClickHouse 在頭條的技術演進

圖 2 ClickHouse 服務化與自動化中的問題

Map 數據類型:動態 Schema

我們在做整個框架的過程中發現,有時候產品存在動態 Schema 的需求。我們當時增加了 Map 的數據類型,主要解決的問題是產品支持的 APP 很多,上報的 Model 也是多變的,它跟用戶的日誌定義有關,有很多用戶自定義參數,就相當於動態的 Schema。從數據產品設計的角度來看又需要相對固定的 Schema,二者之間就會存在一定的鴻溝。最終我們是通過 Map 類型來解決的。

實現 Map 的方式比較多,最簡單的就是像 LOB 的方式,或者像 Two-implicit column 的方式。當時產品要求訪問 Map 單鍵的速度與普通的 column 速度保持一致,那麼比較通用的解決方案不一定能夠滿足我們的要求。當時做的時候,從數據的特徵來看,我們發現雖然叫 Map,但是它的 keys 總量是有限的,因為依賴於用戶自定義的參數不會特別多,在一定的時間範圍內,Keys 數量會是比較固定的。而 ClickHouse 有一個好處:它的數據在局部是自描述的,Part 之間的數據差異自動能夠 Cover 住。

最後我們採用了一個比較簡單的展平模型,在我們數據寫入過程中,它會做一個局部打平。以圖 3 為例,表格中兩行總共只有三個 key,我們就會在存儲層展開這三列。這三列的描述是在局部描述的,有值的用值填充,沒有值就直接用 N 填充。現在 Map 類型在頭條 ClickHouse 集群的各種服務上都在使用,基本能滿足大多數的需求。

最快開源 OLAP 引擎!ClickHouse 在頭條的技術演進

圖 3 局部 PART level 展平模型(自描述)

另外,為了滿足訪問 key 的高效性,我們在執行層做自動改寫,key 的訪問會直接改寫成對隱私列的訪問。這樣架構會有一個比較大的問題,它對於 Map 列的全值訪問代價比較大,需要從隱式列反構建出全值列。對於這個問題,我們也沒有很好地解決,因為實際上在很多時候我們只關心 key 的訪問效率。

另外一個問題,這是 LSM 架構,存在一個數據合併的過程,合併時可能需要重構 Map。我們為了提高合併的速度,做了一些相應的優化,可以做到無序重構。這些做完後,收益還是比較大的。首先,Table 的 schema 能夠簡化,理論上現在 Table 的定義只需要做幾種技術類型的組合就可以;然後 ETL 構建的邏輯不再需要關注用戶的隱私列參數,可以簡化 ETL 的構建邏輯;最後,對數據的自動化接入幫助也很大。圖 4 是我們優化之後的語法,大家可以看到相對比較簡單。

最快開源 OLAP 引擎!ClickHouse 在頭條的技術演進

圖 4 Map 數據類型 - 動態 Schema 相關語法

大數據量和高可用

不知道大家在使用 ClickHouse 的過程中有沒有一個體會,它的高可用方案在大的數據量下可能會有點問題。主要是 zookeeper 的使用方式可能不是很合理,也就是說它原生的 Replication 方案有太多的信息存在 ZK 上,而為了保證服務,一般會有一個或者幾個副本,在頭條內部主要是兩個副本的方案。

我們當時有一個 400 個節點的集群,還只有半年的數據。突然有一天我們發現服務特別不穩定,ZK 的響應經常超時,table 可能變成只讀模式,發現它 znode 的太多。而且 ZK 並不是 Scalable 的框架,按照當時的數據預估,整個服務很快就會不可用了。

我們分析後得出結論,實際上 ClickHouse 把 ZK 當成了三種服務的結合,而不僅把它當作一個 Coordinate service,可能這也是大家使用 ZK 的常用用法。ClickHouse 還會把它當作 Log Service,很多行為日誌等數字的信息也會存在 ZK 上;還會作為表的 catalog service,像表的一些 schema 信息也會在 ZK 上做校驗,這就會導致 ZK 上接入的數量與數據總量會成線性關係。按照這樣的數據增長預估,ClickHouse 可能就根本無法支撐頭條抖音的全量需求。

社區肯定也意識到了這個問題,他們提出了一個 mini checksum 方案,但是這並沒有徹底解決 znode 與數據量成線性關係的問題。所以我們就基於 MergeTree 存儲引擎開發了一套自己的高可用方案。我們的想法很簡單,就是把更多 ZK 上的信息卸載下來,ZK 只作為 coordinate Service。只讓它做三件簡單的事情:行為日誌的 Sequence Number 分配、Block ID 的分配和數據的元信息,這樣就能保證數據和行為在全局內是唯一的。

關於節點,它維護自身的數據信息和行為日誌信息,Log 和數據的信息在一個 shard 內部的副本之間,通過 Gossip 協議進行交互。我們保留了原生的 multi-master 寫入特性,這樣多個副本都是可以寫的,好處就是能夠簡化數據導入。圖 6 是一個簡單的框架圖。

以這個圖為例,如果往 Replica 1 上寫,它會從 ZK 上獲得一個 ID,就是 Log ID,然後把這些行為和 Log Push 到集群內部 shard 內部活著的副本上去,然後當其他副本收到這些信息之後,它會主動去 Pull 數據,實現數據的最終一致性。我們現在所有集群加起來 znode 數不超過三百萬,服務的高可用基本上得到了保障,壓力也不會隨著數據增加而增加。

最快開源 OLAP 引擎!ClickHouse 在頭條的技術演進

圖 5 zookeeper 使用問題

最快開源 OLAP 引擎!ClickHouse 在頭條的技術演進

圖 6 HaMergeTree 簡單框架

解決了以上幾個問題之後,我們還在對 ClickHouse 做持續改進。我們最近也碰到了一些 Log 調度之類的問題,當時我們對 Log 調度並沒有做特別的優化,實際上還是用 ClickHouse 的原生調度,在有些集群上可能會碰到一些問題,比如有些表的 Log 調度延遲會比較高一點,我們現在也正在嘗試解決。

String 類型處理效率:Global Dictionary

另外,為了滿足交互式的需求,在相當長的一段時間我們都在思考怎麼提高數據執行的性能。大家在做數倉或者做大數據場景的時候會發現,用戶特別喜歡字符串類型,但是你如果做執行引擎執行層,就特別不喜歡處理這類 String 類型的數據,因為它是變長的,存在執行上有較高代價。String 類型的處理效率,跟數字類型的處理效率有 10 倍的差距,所以我們做了一個全局字典壓縮的解決方案,目的並不是為了節省存儲空間,而是為了提高執行的效率,這是相當重要一個出發點。我們希望把一些常見的算子儘量在壓縮域上執行,不需要做數據的解壓。

目前我們只做了一個 pure dictionary compression,支持的算子也比較少,比如 predication 支持等值比較或者 in 等類似的比較能夠在壓縮域上直接執行,這已經能夠覆蓋我們很多的場景,像 group by 操作也能夠在壓縮域上做。

說到 Global Dictionary,其實也並不是完全的 Global ,每個節點有自己的 Dictionary,但是在一個集群內部,各個節點之前的字典可能是不一樣的。為什麼沒有做全局在集群內部做一個字典?

第一,全局字典會把 coordinate 協議搞得特別複雜,我以前做數據庫的時候有個項目,採用了集群級別 Global Dictionary,碰到了比較多的挑戰。字典壓縮只支持了 MergeTree 相關的存儲引擎。壓縮的行為發生主要有三種操作,像數據的插入或者數據的後臺合併,都會觸發 compression,還有很多數據的批量 roll in 或 roll out,也會做一些字典的異步構建。

剛才也提到,我們的主要出發點就是想在執行層去做非解壓的計算,主要是做 Select query,每一個 Select 來的時候,我們都會在分析階段做一些合法性的校驗,評估其在壓縮域上直接執行是否可行,如果滿足標準,就會改寫語法樹。如果壓縮的 column 會出現在輸出的列表中,會顯式地加一個 Decompress Stream 這樣可選的算子,然後後續執行就不太需要改動,而是可以直接支持。當 equality 的比較以及 group by 操作直接在壓縮上執行,最後整體的收益大概提高 20% 到 30%。

剛才提到,我們的字典不是一個集群水平的,那大家可能會有所疑問,比如對分佈式表的 query 怎麼在壓縮域上做評估?我們稍微做了一些限制,很多時候使用壓縮場景的是用戶行為分析系統,它是按用戶 ID 去做 shard,然後節點之間基本做到沒有交互。我們也引入了一個執行模式,稍微在它的現有計算上改了一下,我們叫做完美分佈加智能合併的模式。在這個模式下,分佈式表的 query 也是能夠在字典上做評估。收益也還可以,滿足當時設計時候的要求。

最快開源 OLAP 引擎!ClickHouse 在頭條的技術演進

圖 7 壓縮域執行

特定場景內存 OOM

有時候做一個系統,內存使用的問題也會比較嚴重。尤其當做數據量大的系統時,經常發生內存受限的問題,或者說 OOM 最後被系統殺掉。ClickHouse 因為有很多數據的加速,比如 Index & mark 文件,信息會在實例啟動的時候加載,整個加載過程非常慢,有時候一個集群起來可能得要半個小時。

雖然我們對這個問題做了一些優化,能夠做到並行加載,但是也得好幾分鐘。如果實例被系統 Kill 了之後,對服務還會有影響,我們的系統經常要回答一些用戶這樣的查詢,例如需要查 60 天內用戶的轉化率或者整個用戶的行為路徑對應的每天轉化率。這種 Block 的操作需要把很多數據從底層撈出來,在時間緯度上進行排序,找出對應的模式。

如果不進行優化,基本上一個 Query 需要使用的內存會超過一百 G,如果稍微併發一下,內存那可能就支撐不了。並且,由於其使用的內存分配器的原因,也很難把內存的實際使用量限制得很準,這就偶爾會發生被系統 Kill 的場景。

我們想從 engine 優化的角度去解決問題,本質上就是 Blocked Aggregator 的操作,它沒有感知到底層的數據分佈。這個 Feature 有點意思,也是我們從數據分佈到執行共同優化的一個嘗試,實現相對來說比較粗糙,但是現在線上也已經開始用了。

它的思路是這樣的,我們的 Aggregator 執行路徑可以由 HINT 來控制,HINT 的生成是由上面的產品生成的,因為產品能夠感知數據分佈,也能夠知道這些指標的語義。HINT 最關鍵的一個作用是把 Blocked Aggregator 局部做到流水線化,比如計算 60 天的指標,它可以生成一個 read planner 控制底層的 reader,每一批處理的是那一部分數據。上層的指標輸出可以把這些信息 aggregate 到對應的地方,做從下向上的執行輸出。最上層的 schedule 流輸出指標可以把每天的計算結果匯聚起來,然後做一個總體的整理,最終就形成一個輸出。

這些優化工作完成以後有了很明顯的收益,與默認沒有開啟的時候相比,系統的內存使用可能會下降 5 倍左右。現在應用場景主要在兩個指標的計算上,像漏斗之類的和計算用戶行為路徑會使用。

最快開源 OLAP 引擎!ClickHouse 在頭條的技術演進

圖 8 特定場景內存 OOM - Step-ed Aggregation

Array 類型處理

下面介紹一下我們怎麼處理 Array 類型,並將它做得更高效。

Array 類型處理的需求主要來自於 AB 實驗的需求。當前我們的系統也會做一些實時 AB 指標的輸出,實驗 ID 在我們系統中以數組的形式存儲。頭條內部的 AB 實驗也比較多,對於一個單條記錄,它可能命中的實驗數會有幾百上千個。有時候我們需要查詢命中實驗的用戶行為是什麼樣的,也就是要做一些 Array hasAny 語義的評估。

從 Array 執行來看,因為它的數據比較長,所以說從數據的反序列化代價以及整個 Array 在執行層的 Block 表示來說不是特別高效,要花相當大的精力去做 Array column 的解壓。而且會在執行層消耗特別大的內存去表示,如果中間發生了 Filter 的話,要做 Block column 過濾,permutation 會帶上 Array,執行起來會比較慢。那我們就需要一個比較好的方式去優化從讀取到執行的流程。

做大數據,可能最有效的優化方式就是怎麼樣做到底層數據的剪枝,

數據少是提高數據處理速度的終極法寶。我們提出了現在的剪枝方法,一個是 Part level,一個是 MRK range level。那有沒有一種針對於 Array column 的剪枝方式?我們做了下面兩個嘗試:

首先做了一個雙尺度的 Bloom Filter,記錄 Array 裡面 Key 的運動情況,實現了 Part level 和細粒度的 MRK range level,做完後在一些小的產品上效果還挺好的,但最後真正在大產品上,像抖音、頭條全量,我們發現 Fill factor 太高,實際上沒太大幫助。之後我們開發了一個 BitMap 索引,基本的想法是把 Array 的表示轉化成 Value 和 Bit 的結合。在執行層改寫,把 has 的評估直接轉換成 get BitMap。

做完之後,我們上線了一兩個產品,在一些推薦的場景上使用。這個方案主要問題就是 BitMap 數據膨脹問題稍微嚴重了一點,最近我們也做了一些優化,可能整體的數據佔用是原始數據的 50% 左右,比如 Array 如果是 1G,可能 Bit map 也會有 500M。我們整個集群的副本策略是一個 1:N 的策略,副本存儲空間比較有壓力,我們現在也沒有大範圍的使用,但效果是很好的,對於評估基本上也會有一、二十倍的提升效果。

其他問題和改進

以上是我今天分享的主要內容,後面的內容相對比較彈性。字節跳動自身的數據源是比較多樣的,我們對其他數據源也做了一些特定的優化。比如我們有少量業務會消費 Kafka,而現在的 Kafka engine 沒有做主備容錯,我們也在上面做了一些高可用的 HaKafka engine 的實現,來做到主備容錯以及一些指定分區消費功能,以滿足一些特定領域的需求。

另外,我們發現它的 update/delete 功能比較弱,而我們有一部分業務場景想覆蓋業務數據庫上面的數據,像 MySQL 上也是會有一些增刪操作的。我們就基於它 Collapse 的功能做了一些設計,去支持輕量級的 update/delete,目前產品還屬於剛起步的階段,但是從測試結果來看,能夠支撐從 MySQL 到 ClickHouse 的遷移,基於 delta 表的方案也是可行的。

我們還做了一些像小文件讀取的問題,提供了一個多尺度分區的方案,但由於各種原因沒有在線上使用。

說到底,我們的需求還有很多,現在也還有很多工作正在做,比如控制面的開發,簡化整體運維,還有 Query cache 以及整個數據指標的正確性還不能達到百分之百的保障,特別是像實時數據流的數據,我們也想做更深層次的優化。我們還希望增強物化視圖,也準備提高分佈式 Join 能力,因為我們自研 BI 對此還有比較強的需求,未來我們會在這一塊做一些投入。

以上就是去年一年我們在 ClickHouse 這塊主要做的一些工作。總體來說 ClickHouse 是一個比較短小精幹的引擎,也比較容易上手和定製,大家都可以去嘗試一下。


分享到:


相關文章: