用MongoDB Change Streams 在BigQuery中複製數據

用MongoDB Change Streams 在BigQuery中複製數據

譯者注:

Chang Stream(變更記錄流) 是指collection(數據庫集合)的變更事件流,應用程序通過db.collection.watch()這樣的命令可以獲得被監聽對象的實時變更。BigQuery是Google推出的一項Web服務,該服務讓開發者可以使用Google的架構來運行SQL語句對超級大的數據庫進行操作。

本文將分享:當我們為BigQuery數據管道使用MongoDB變更流構建一個MongoDB時面臨的挑戰和學到的東西。

在講技術細節之前,我們最好思考一下為什麼要建立這個管道。主要有以下兩個原因:

1. 在一定的規模上為了分析而查詢MongoDB是低效的;

2. 我們沒有把所有數據放在MongoDB中(例如分條計費信息)。

在一定的規模上,作為服務供應商的數據管道價格昂貴。通常也不會提供類似軟刪除(例如,使用一個deleted_at字段)這樣的複製刪除記錄的方法。

複製無模式數據

使用MongoDB數據庫是我們要注意的第一件事情就是一些集合有一個需要注意的模式:嵌套文檔,而且其中一些文檔也是數組。

通常,一個嵌套文檔代表一個一對一關係,一個數組是一對多關係。幸運的是Big Query同時支持重複的和嵌套的字段。

根據我們的研究,最常用的複製MongoDB數據的方法是在集合中使用一個時間戳字段。該字段的典型名稱是updated_at,在每個記錄插入和更新時該字段就會更新。使用批處理的方法是很容易實現這種方式的,只需要查詢預期的數據庫即可。當將這種方法運用到我們的數據和集合,我們發現兩個主要的問題:

1. 並非所有我們想要複製的集合都有這個字段。沒有updated_at字段,我們如何知道要複製那些更新的記錄呢?

2. 這種方法不會跟蹤已刪除記錄。我們只是把他們從原始集合中移除了,但永遠不會在Big Query表中進行更新。

幸運的是,MongoDB把對集合產生的所有的變化都記錄在oplog的(oplog是local庫下的一個固定集合)日誌裡面。MongoDB 3.6版本以來,你可以使用變更流API來查詢日誌。這樣,我們就會在集合中發生每個變化(包括刪除操作)時得到警示。

那麼我們的目的就是構建一個管道,該管道可以移動所有變更事件記錄,這些記錄來自一個Big Query表,MongoDB使用每個記錄最近的狀態把流變更為這張表。

構建管道

我們的第一個方法是在Big Query中為每個集合創建一個變更流,該集合是我們想要複製的,並從那個集合的所有變更流事件中獲取方案。這種辦法很巧妙。如果在一個記錄中添加一個新的字段,管道應該足夠智能,以便在插入記錄時修改Big Query表。

由於想要儘可能的在Big Query中獲取數據,我們用了另外一個方法。把所有的變更流事件以JSON塊的形式放在BigQuery中。我們可以使用dbt這樣的把原始的JSON數據工具解析、存儲和轉換到一個合適的SQL表中。這當然有一些缺點,但可以讓我們擁有一個真正及時的端到端管道。管道有以下部件:

1. 一個運行在Kubernetes(是一個開源的,用於管理雲平臺中多個主機上的容器化的應用/(carden,一款開發人員工具)的服務,他可以讀取每個集合的MongoDB變更流,並將其放在一個簡單的Big Query表當中(添加所有的記錄)。

2. 一個讀取帶有增量原始數據的源表並實現在一個新表中查詢的dbt cronjob(dbt,是一個命令行工具,只需編寫select語句即可轉換倉庫中的數據;cronjob,顧名思義,是一種能夠在固定時間運行的Job對象)。這個表中包含了每一行自上一次運行以來的所有狀態。這是一個dbt SQL在生產環境下如何操作的例子。

通過這兩個步驟,我們實時擁有了從MongoDB到Big Query的數據流。我們也可以跟蹤刪除以及所有發生在我們正在複製的表上的變化(這對一些需要一段時間內的變化信息的分析是很有用的)。

由於在MongoDB變更流爬行服務日期之前我們沒有任何數據,所以我們錯失了很多記錄。為了解決這一問題,我們決定通過創建偽變化事件回填數據。我們備份了MongoDB集合,並製作了一個簡單的腳本以插入用於包裹的文檔。這些記錄送入到同樣的BigQuery表中。現在,運行同樣的dbt模型給了我們帶有所有回填記錄的最終表。

我們發現最主要的問題是需要用SQL寫所有的提取操作。這意味著大量額外的SQL代碼和一些額外的處理。當時使用dbt處理不難。另外一個小問題是BigQuery並不天生支持提取一個以JSON編碼的數組中的所有元素。

結論

對於我們來說付出的代價(迭代時間,輕鬆的變化,簡單的管道)是物超所值的。因為我們一開始使用這個管道(pipeline)就發現它對端到端以及快速迭代的所有工作都非常有用!我們用只具有BigQuery增加功能的變更流表作為分隔。未來我們計劃遷移到Apache Beam(是一個統一的編程框架,支持批處理和流處理,並可以將用Beam編程模型構造出來的程序,在多個計算引擎如Apache Apex, Apache Flink, Apache Spark, Google Cloud Dataflow等上運行。)和雲數據流上面,但那些工作要再寫文字說明了。

關於原文作者:David Gasquez,你可以在Twitter上找到他@davidgasquez.

譯者:張衝

對軟件工程、多媒體設計、數據庫編程、程序設計方面有多年的工作經驗。具有較強的網絡管理知識和實踐經驗,現主要從事網絡安全相關工作,興趣是從事大數據分析工作。

用MongoDB Change Streams 在BigQuery中複製數據

進入MongoDB技術交流群請添加運營同學微信 cyqcyl ,添加請備註


分享到:


相關文章: