MongoDB + Spark: 完整的大數據解決方案

Spark介紹

按照官方的定義,Spark 是一個通用,快速,適用於大規模數據的處理引擎。

  • 通用性:我們可以使用Spark SQL來執行常規分析, Spark Streaming 來來做流數據處理, 以及用Mlib來執行機器學習等。Java,python,scala及R語言的支持也是其通用性的表現之一。
  • 快速: 這個可能是Spark成功的最初原因之一,主要歸功於其基於內存的運算方式。當數據的處理過程需要反覆迭代時,Spark可以直接在內存中暫存數據,而無需像MapReduce一樣需要把數據寫回磁盤。官方的數據表明:它可以比傳統的MapReduce快上100倍。
  • 大規模:原生支持HDFS,並且其計算節點支持彈性擴展,利用大量廉價計算資源併發的特點來支持大規模數據處理。


我們能用它做什麼

那我們能用Spark來做什麼呢? 場景數不勝數。

最簡單的可以只是統計一下某一個頁面多少點擊量,複雜的可以通過機器學習來預測趨勢。

個性化 是一個常見的案例,比如說,Yahoo的網站首頁使用Spark來實現快速的用戶興趣分析。應該在首頁顯示什麼新聞?原始的做法是讓用戶選擇分類,聰明的做法就是在用戶交互的過程中揣摩用戶可能喜歡的文章;另一方面就是要在新聞進來時候進行分析並確定什麼樣的用戶是可能的受眾。新聞的時效性非常高,按照常規的MapReduce做法,對於Yahoo幾億用戶及海量的文章,可能需要計算一天才能得出所有結果。Spark的高效運算可以在這裡得到充分的運用,來保證新聞推薦在數十分鐘或更短時間內完成。另外,美國最大的有線電視商Comcast用它來做節目推薦,最近剛和滴滴聯姻的Uber用它做實時訂單分析,優酷則在Spark上實現了商業智能的升級版

Spark生態系統

在我們開始談MongoDB 和Spark 之前,我們首先來了解一下Spark的生態系統。 Spark 作為一個大型分佈式計算框架,需要和其他組件一起協同工作。

MongoDB + Spark: 完整的大數據解決方案


在Hdaoop裡面,HDFS作為一個數據層位於其核心部位。

Spark是Hadoop生態系統的一顆新星,原生就支持HDFS。大家知道HDFS是用來管理大規模非結構化數據的存儲系統,具有高可用和巨大的橫向擴展能力。

而作為一個橫向擴展的分佈式集群,資源管理是其核心必備的能力,Spark 可以通過YARN或者MESOS來負責資源(CPU)分配和任務調度。如果你不需要Spark管理節點的高可用,你也可以直接使用Spark standalone。

在有了數據層和資源管理層後, 接下來就是我們真正的計算引擎了。

Hadoop技術的兩大基石之一的MapReduce就是用來實現集群大規模並行計算。而現在就多了一個選項:Spark。 MapReduce的特點是,用4個字來概括,簡單粗暴。採用divide & conquer戰術,我們可以用MapReduce來處理PB級的數據。 而Spark 作為打了雞血的MapReduce增強版,利用了內存價格大量下降的時代因素,充分把計算所用變量和中間結果放到內存裡,並且提供了一整套機器學習的分析算法,在加上很多語言的支持,使之成為一個較之於MapReduce更加優秀的選擇。

由於MapReduce 是一個相對並不直觀的程序接口,所以為了方便使用,一系列的高層接口如Hive或者Pig應運而生。 Hive可以讓我們使用非常熟悉的SQL語句的方式來做一些常見的統計分析工作。同理,在Spark 引擎層也有類似的封裝,如Spark SQL、 RDD以及2.0版本新推出的Dataframe等。

所以一個完整的大數據解決方案,包含了存儲,資源管理,計算引擎及接口層。 那麼問題來了:我們畫了這麼大這麼圓的大餅,MongoDB可以吃哪一塊呢?

MongoDB + Spark: 完整的大數據解決方案


大家可以想象,MongoDB是個什麼?是個database。 所以自然而然,MongoDB可以擔任的角色,就是數據存儲的這一部分。在和 Spark一起使用的時候,MongoDB就可以扮演HDFS的角色來為Spark提供計算的原始數據,以及用來持久化分析計算的結果。

HDFS vs. MongoDB

既然我們說MongoDB可以用在HDFS的地方,那我們來詳細看看兩者之間的差異性。

在說區別之前,其實我們可以先來注意一下兩者的共同點。HDFS和MongoDB都是基於廉價x86服務器的橫向擴展架構,都能支持到TB到PB級的數據量。數據會在多節點自動備份,來保證數據的高可用和冗餘。兩者都支持非結構化數據的存儲,等等。

但是,HDFS和MongoDB更多的是差異點:

  • 如在存儲方式上 HDFS的存儲是以文件為單位,每個文件64MB到128MB不等。而MongoDB則是細顆粒化的、以文檔為單位的存儲。
  • HDFS不支持索引的概念,對數據的操作侷限於掃描性質的讀,MongoDB則支持基於二級索引的快速檢索。
  • MongoDB可以支持常見的增刪改查場景,而HDFS一般只是一次寫入後就很難進行修改。
  • 從響應時間上來說,HDFS一般是分鐘級別而MongoDB對手請求的響應時間通常以毫秒作為單位。


一個日誌的例子

如果說剛才的比較有些抽象,我們可以結合一個實際一點的例子來理解。

比如說,一個比較經典的案例可能是日誌記錄管理。在HDFS裡面你可能會用日期範圍來命名文件,如7月1日,7月2日等等,每個文件是個日誌文本文件,可能會有幾萬到幾十萬行日誌。

而在MongoDB裡面,我們可以採用一個JSON的格式,每一條日誌就是一個JSON document。我們可以對某幾個關心的字段建索引,如時間戳,錯誤類型等。

我們來考慮一些場景,加入我們相對7月份所有日誌做一些全量的統計,比如每個頁面的所有點擊量,那麼這個HDFS和MongoDB都可以正常處理。

如果有一天你的經理告訴你:他想知道網站上每天有多少404錯誤在發生,這個時候如果你用HDFS,就還是需要通過全量掃描所有行,而MongoDB則可以通過索引,很快地找到所有的404日誌,可能花數秒鐘就可以解答你經理的問題。

又比如說,如果你希望對每個日誌項加一個自定義的屬性,在進行一些預處理後,MongoDB就會比較容地支持到。而一般來說,HDFS是不支持更新類型操作的。

好的我們瞭解了MongoDB為什麼可以替換HDFS並且為什麼有這個必要來做這個事情,下面我們就來看看Spark和MongoDB怎麼玩!

Spark + MongoDB

Spark的工作流程可以概括為三部曲:創建併發任務,對數據進行transformation操作,如map, filter,union,intersect等,然後執行運算,如reduce,count,或者簡單地收集結果。

MongoDB + Spark: 完整的大數據解決方案


這裡是Spark和MongoDB部署的一個典型架構。

Spark任務一般由Spark的driver節點發起,經過Spark Master進行資源調度分發。比如這裡我們有4個Spark worker節點,這些節點上的幾個executor 計算進程就會同時開始工作。一般一個core就對應一個executor。

每個executor會獨立的去MongoDB取來原始數據,直接套用Spark提供的分析算法或者使用自定義流程來處理數據,計算完後把相應結果寫回到MongoDB。

我們需要提到的是:在這裡,所有和MongoDB的交互都是通過一個叫做Mongo-Spark的連接器來完成的。

MongoDB + Spark: 完整的大數據解決方案


另一種常見的架構是結合MongoDB和HDFS的。Hadoop在非結構化數據處理的場景下要比MongoDB的普及率高。所以我們可以看到不少用戶會已經將數據存放在HDFS上。這個時候你可以直接在HDFS上面架Spark來跑,Spark從HDFS取來原始數據進行計算,而MongoDB在這個場景下是用來保存處理結果。為什麼要這麼麻煩?幾個原因:

  • Spark處理結果數量可能會很大,比如說,個性化推薦可能會產生數百萬至數千萬條記錄,需要一個能夠支持每秒萬級寫入能力的數據庫
  • 處理結果可以直接用來驅動前臺APP,如用戶打開頁面時獲取後臺已經為他準備好的推薦列表。


Mongo Spark Connector 連接器

在這裡我們在介紹下MongoDB官方提供的Mongo Spark連接器 。目前有3個連接器可用,包括社區第三方開發的和之前Mongo Hadoop連接器等,這個Mong Spark是最新的,也是我們推薦的連接方案。

MongoDB + Spark: 完整的大數據解決方案


這個連接器是專門為Spark打造的,支持雙向數據,讀出和寫入。但是最關鍵的是 條件下推,也就是說:如果你在Spark端指定了查詢或者限制條件的情況下,這個條件會被下推到MongoDB去執行,這樣可以保證從MongoDB取出來、經過網絡傳輸到Spark計算節點的數據確實都是用得著的。沒有下推支持的話,每次操作很可能需要從MongoDB讀取全量的數據,性能體驗將會很糟糕。拿剛才的日誌例子來說,如果我們只想對404錯誤日誌進行分析,看那些錯誤都是哪些頁面,以及每天錯誤頁面數量的變化,如果有條件下推,那麼我們可以給MongoDB一個限定條件:錯誤代碼=404, 這個條件會在MongoDB服務器端執行,這樣我們只需要通過網絡傳輸可能只是全部日誌的0.1%的數據,而不是沒有條件下推情況下的全部數據。

另外,這個最新的連接器還支持和Spark計算節點Co-Lo 部署。就是說在同一個節點上同時部署Spark實例和MongoDB實例。這樣做可以減少數據在網絡上的傳輸帶來的資源消耗及時延。當然,這種部署方式需要注意內存資源和CPU資源的隔離。隔離的方式可以通過Linux的cgroups。

Spark + MongoDB 成功案例

目前已經有很多案例在不同的應用場景中使用Spark+MongoDB。

法國航空是法國最大的航空公司,為了提高客戶體驗,在最近施行的360度客戶視圖中,使用Spark對已經收集在MongoDB裡面的客戶數據進行分類及行為分析,並把結果(如客戶的類別、標籤等信息)寫回到MongoDB內每一個客戶的文檔結構裡。

Stratio是美國硅谷一家著名的金融大數據公司。他們最近在一家在31個國家有分支機構的跨國銀行實施了一個實時監控平臺。該銀行希望通過對日誌的監控和分析來保證客戶服務的響應時間以及實時監測一些可能的違規或者金融欺詐行為。在這個應用內, 他們使用了:

  • Apache Flume 來收集log
  • Spark來處理實時的log
  • MongoDB來存儲收集的log以及Spark分析的結果,如Key Performance Indicators等


在我們國內,東方航空最近剛完成了 一個Spark運價的POC測試。下面我們來看看他們做的事情。

東方航空的挑戰

東方航空作為國內的3大行之一,每天有1000多個航班,服務26萬多乘客。過去,顧客在網站上訂購機票,平均資料庫查詢200次就會下單訂購機票,但是現在平均要查詢1.2萬次才會發生一次訂購行為,同樣的訂單量,查詢量卻成長百倍。按照50%直銷率這個目標計算,東航的運價系統要支持每天16億的運價請求。

思路:空間換時間

當前的運價是通過實時計算的,按照現在的計算能力,需要對已有系統進行100多倍的擴容。另一個常用的思路,就是採用空間換時間的方式。與其對每一次的運價請求進行耗時300ms的運算,不如事先把所有可能的票價查詢組合窮舉出來並進行批量計算,然後把結果存入MongoDB裡面。當需要查詢運價時,直接按照 出發+目的地+日期的方式做一個快速的DB查詢,響應時間應該可以做到幾十毫秒。

那為什麼要用MongoDB?因為我們要處理的數據量龐大無比。按照1000多個航班,365天,26個倉位,100多渠道以及數個不同的航程類型,我們要實時存取的運價記錄有數十億條之多。這個已經遠遠超出常規RDBMS可以承受的範圍。

MongoDB基於內存緩存的數據管理方式決定了對併發讀寫的響應可以做到很低延遲,水平擴展的方式可以通過多臺節點同時併發處理海量請求。

事實上,全球最大的航空分銷商,管理者全世界95%航空庫存的Amadeus也正是使用MongoDB作為其1000多億運價緩存的存儲方案。

Spark + MongoDB 方案

我們知道MongoDB可以用來做我們海量運價數據的存儲方案,在大規模並行計算方案上,就可以用到嶄新的Spark技術。

MongoDB + Spark: 完整的大數據解決方案


這裡是一個運價系統的架構圖。 左邊是發起航班查詢請求的客戶端,首先會有API服務器進行預處理。一般航班請求會分為庫存查詢和運價查詢。庫存查詢會直接到東航已有的庫存系統(Seat Inventory),同樣是實現在MongoDB上面的。在確定庫存後根據庫存結果再從Fare Cache系統內查詢相應的運價。

Spark集群則是另外一套計算集群,通過Spark MongoDB連接套件和MongoDB Fare Cache集群連接。Spark 計算任務會定期觸發(如每天一次或者每4小時一次),這個任務會對所有的可能的運價組合進行全量計算,然後存入MongoDB,以供查詢使用。右半邊則把原來實時運算的集群換成了Spark+MongoDB。Spark負責批量計算一年內所有航班所有倉位的所有價格,並以高併發的形式存儲到MongoDB裡面。每秒鐘處理的運價可以達到數萬條。

當來自客戶端的運價查詢達到服務端以後,服務端直接就向MongoDB發出按照日期,出發到達機場為條件的mongo查詢。

批處理計算流程

MongoDB + Spark: 完整的大數據解決方案


這裡是Spark計算任務的流程圖。需要計算的任務,也就是所有日期航班倉位的組合,事先已經存放到MongoDB裡面。

任務遞交到master,然後預先加載所需參考數據,broadcast就是把這些在內存裡的數據複製到每一個Spark計算節點的JVM,然後所有計算節點多線程併發執行,從Mongodb裡取出需要計算的倉位,調用東航自己的運價邏輯,得出結果以後,並保存回MongoDB。

Spark 任務入口程序

Spark和MongoDB的連接使用非常簡單,下面就是一個代碼示例:

MongoDB + Spark: 完整的大數據解決方案


處理能力和響應時間比較

這裡是一個在東航POC的簡單測試結果。從吞吐量的角度,新的API服務器單節點就可以處理3400個併發的運價請求。在顯著提高了併發的同時,響應延遲則降低了10幾倍,平均10ms就可以返回運價結果。按照這個性能,6臺 API服務器就可以應付將來每天16億的運價查詢。

MongoDB + Spark: 完整的大數據解決方案


Spark + MongoDB演示

接下來是一個簡單的Spark+MongoDB演示。

安裝 Spark

# curl -OL http://d3kbcqa49mib13.cloudfront.net/spark-1.6.0-bin-hadoop2.6.tgz# mkdir -p ~/spark# tar -xvf spark-1.6.0-bin-hadoop2.6.tgz -C ~/spark --strip-components=1


測試連接器

# cd ~/spark# ./bin/spark-shell--conf "spark.mongodb.input.uri=mongodb://127.0.0.1/flights.av" --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/flights.output" --packages org.mongodb.spark:mongo-spark-connector_2.10:1.0.0import com.mongodb.spark._import org.bson.DocumentMongoSpark.load(sc).take(10).foreach(println)


簡單分組統計

數據: 365天,所有航班庫存信息,500萬文檔

任務: 按航班統計一年內所有餘票量

MongoSpark.load(sc) .map(doc=>(doc.getString("flight") ,doc.getLong("seats"))) .reduceByKey((x,y)=>(x+y)) .take(10) .foreach(println)


簡單分組統計加條件過濾

數據: 365天,所有航班庫存信息,500萬文檔

任務: 按航班統計一年內所有庫存,但是隻處理昆明出發的航班

import org.bson.DocumentMongoSpark.load(sc) .withPipeline(Seq(Document.parse("{ $match: { orig : 'KMG' } }"))) .map(doc=>(doc.getString("flight") ,doc.getLong("seats"))) .reduceByKey((x,y)=>(x+y)) .take(10) .foreach(println)


性能優化事項

  • 使用合適的chunksize (MB)
  • Total data size / chunksize = chunks = RDD partitions = spark tasks
  • 不要將所有CPU核分配給Spark
  • 預留1-2個core給操作系統及其他管理進程
  • 同機部署
  • 適當情況可以同機部署Spark+MongoDB,利用本地IO提高性能


總結

上面只是一些簡單的演示,實際上Spark + MongoDB的使用可以通過Spark的很多種形式來使用。我們來總結一下Spark + MongoDB的應用場景。在座的同學可能很多人已經使用了MongoDB,也有些人已經使用了Hadoop。我們可以從兩個角度來考慮這個事情:

對那些已經使用MongoDB的用戶,如果你希望在你的MongoDB驅動的應用上提供個性化功能,比如說像Yahoo一樣為你找感興趣的新聞,能夠在你的MongoDB數據上利用到Spark強大的機器學習或者流處理,你就可以考慮在MongoDB集群上部署Spark來實現這些功能。

如果你已經使用Hadoop而且數據已經在HDFS裡面,你可以考慮使用Spark來實現更加實時更加快速的分析型需求,並且如果你的分析結果有數據量大、格式多變以及這些結果數據要及時提供給前臺APP使用的需求,那麼MongoDB可以用來作為你分析結果的一個存儲方案。



分享到:


相關文章: