基於Spark、NoSQL的實時數據處理實踐

基於Spark、NoSQL的實時數據處理實踐

本文基於TalkingData 張學敏 在公司內部KOL的分享主題《基於Spark、NoSQL實時數據處理實踐》的整理,同時也在DTCC大會上做了同主題的分享。

主要介紹了項目的技術選型技術架構,重點介紹下項目面臨的挑戰和解決辦法,還介紹了面對多維度多值多版本等業務場景時,使用BitmapHBase特性解決問題方法。

共分為上下兩篇,本次發佈上篇,下篇敬請關注。

一、數據相關情況

基於Spark、NoSQL的實時數據處理實踐

基於Spark、NoSQL的實時數據處理實踐

數據主要都和智能設備相關,包含的數據內容主要可以分為三部分,一部分是設備信息類,主要包括設備ID,比如Mac、IDFA等,還有設備的軟硬件信息,比如操作系統版本號,屏幕分辨率等。另一部分是業務相關信息類,主要包括業務事件,會話信息,還有行為狀態。關於行為狀態,是我們在智能設備上使用算法推測終端持有者的行為狀態信息,比如靜止、行走、奔跑、乘車等。第三部分是上下文信息,包括設備連接網絡的情況,使用的是蜂窩網絡還是WiFi等,還有設備位置相關的信息,以及其他傳感器相關的數據等。

基於Spark、NoSQL的實時數據處理實踐

關於設備體量,目前設備日活月活分別在2.5億和6.5億以上,每天的事件數在370億左右,一天數據的存儲量是在17T左右。

基於Spark、NoSQL的實時數據處理實踐

上圖為整體的數據架構圖,數據流向是自下往上。數據採集層使用的是TalkingData自研的SDK,通過SDK將數據發往數據收集層。數據收集層使用的是TalkingData自研的DataCollector,Collector會將數據發送到數據接入層的Kafka。每個業務線都有自己的Kafka集群,在Collector可以控制數據的流向,大多數據都是業務線一份,數據中心一份。數據處理層有兩部分,一部分是使用Spark core或sql的離線計算。其中Spark是on yarn模式,使用yarn進行資源管理,中間通過Alluxio進行加速,使用Jenkins進行作業管理和調度,主要負責為業務方提供數據集和數據服務。

另一部分是使用Spark Streaming的實時計算,主要是為TalkingData管理層提供運營數據報表。數據存儲層,主要功能是存放數據處理後的結果,使用分佈式文件系統HDFS、Alluxio存放數據集,使用分佈式數據庫HBase、ScyllaDB,關係型數據庫MySQL以及MPP型數據庫GreenPlum存放服務相關的數據。數據應用層東西就比較多了,有供TalkingData內部使用的數據分析、探索平臺,也有對外內外都可的數據服務、數據模型商城,以及智能營銷雲、觀象臺等。

二、項目面臨的業務訴求

基於Spark、NoSQL的實時數據處理實踐

主要的可總結為四部分:

  • 首先是數據修正:離線計算是將數據存放在了HDFS上,如果數據有延遲,比如事件時間是昨天的數據今天才到,那麼數據將會被錯誤的存放在今天的時間分區內。因為HDFS不支持隨機讀寫,也不好預測數據會延遲多久,所以在離線計算想要完全修正這些數據,成本還是比較高的。
  • 其次是時序數據需求:之前的業務都是以小時、天、周、月等時間週期,面向時間斷面 的宏觀數據分析,隨著公司業務擴展,比如營銷、風控等行業,面向個體的微觀數據分析的需求越來越多,所以需要能夠低成本的把一個設備的相關的數據都取出來做分析。而面向時間斷面的數據每天十幾T,想從中抽出某些設備近1個月的數據就會涉及到500多T的數據。所以需要建立時序數據處理、查詢的能力,能方便的獲取設備歷史上所有數據。
  • 第三是實時處理:離線計算少則延遲一個小時,多則一天或者更久,而有些行業對數據時效性要求是比較高的,比如金融、風控等業務,所以需要實時數據處理。同時,為了更多的豐富設備位置相關數據,我們還建立了WiFi、基站等實體的位置庫,所以在實時數據處理時,需要實時讀取這些庫為那些連接了WiFi、基站但沒位置數據的設備補充位置相關信息。
  • 第四是實時查詢,這裡描述的是面向實體、多維度、多值、多版本,接下來我詳細介紹下。
基於Spark、NoSQL的實時數據處理實踐

我們將事件數據抽象出了各種實體,比如設備、位置、WiFi基站等實體,其中位置實體可以使用GeoHash或者網格表達。每個實體都有唯一ID以及多個維度信息,以設備實體為例,包括ID、軟硬件信息等維度。單個維度又可能會包含多個值,比如WiFi,在家我連接的是WiFi1,到公司鏈接的是WiFi2,所以WiFi維度有WiFi1和WiFi2兩個值。單個值又可能有多個時間版本,比如我在家連接WiFi1可能6點被捕獲到一次,7點被捕獲到兩次。所以,最終建立可以通過指定實體ID,查詢維度、列及時間窗口獲取數據的能力。

三、技術選型和架構

基於Spark、NoSQL的實時數據處理實踐

數據接入層我們選擇的是Kafka,Kafka在大數據技術圈裡出鏡率還是比較高的。Kafka是LinkedIn在2011年開源的,創建初衷是解決系統間消息傳遞的問題。傳統消息系統有兩種模型,一種是隊列模型,一種是訂閱發佈模型。兩者各有優缺,比如隊列模型的消息系統可以支持多個客戶端同時消費不同的數據,也就是可以很方便的擴展消費端的能力,但訂閱發佈模型就不好擴展,因為它是使用的廣播模式。另一個就是,隊列模型的消息只能被消費一次,一旦一個消息被某個消費者處理了,其他消費者將不能消費到該消息,而發佈訂閱模型同一消息可以被所有消費者消費到。Kafka使用Topic分類數據,一個Topic類似一個消息隊列。Kafka還有個概念,叫consumer group,一個group裡可以有多個消費者,同一個topic可以被一個group內的多個消費者同時消費不同的消息,也就是類似隊列模型可以方便的擴展消費端能力。一個Topic也可以被多個group消費,group之間相互沒有影響,也就是類似發佈訂閱模型,Topic中的一條消息可以被消費多次。所以Kafka等於說是使用Topic和Consumer group等概念,將隊列模型和訂閱發佈模型的優勢都糅合了進來。

現在Kafka官方將Kafka的介紹做了調整,不再滿足大家簡單的將其定位為消息隊列,新的介紹描述是:可以被用來創建實時數據管道和流式應用,且具有可擴展、高容錯,高吞吐等優勢。另外,經過7年的發展,kafka也比較成熟了,與周邊其他組件可以很方便的集成。但目前也有兩個比較明顯的劣勢,一個是不能保證Topic級別的數據有序,另一個是開源的管理工具不夠完善。

基於Spark、NoSQL的實時數據處理實踐

Spark現在聽起來不像前幾年那麼性感了,但因為我們離線計算使用的Spark,有一定的技術積累,所以上手比較快。另外,Spark Streaming並不是真正意義上的流式處理,而是微批,相比Storm、Flink延遲還是比較高的,但目前也能完全滿足業務需求,另外,為了技術統一,資源管理和調度統一,所以我們最終選用了Spark Streaming。

Spark Streaming是Spark核心API的擴展,可實現高擴展、高吞吐、高容錯的實時流數據處理應用。支持從Kafka、Flum、HDFS、S3等多種數據源獲取數據,並根據一定的時間間隔拆分成一批批的數據,然後可以使用map、reduce、join、window等高級函數或者使用SQL進行復雜的數據處理,最終得到處理後的一批批結果數據,其還可以方便的將處理結果存放到文件系統、數據庫或者儀表盤,功能還是很完善的。

Spark Streaming將處理的數據流抽象為Dstream,DStream本質上表示RDD的序列,所以任何對DStream的操作都會轉變為對底層RDD的操作。

基於Spark、NoSQL的實時數據處理實踐

HBase是以分佈式文件系統HDSF為底層存儲的分佈式列式數據庫,它是對Google BigTable開源的實現,主要解決超大規模數據集的實時讀寫、隨機訪問的問題,並且具有可擴展、高吞吐、高容錯等優點。HBase這些優點取決於其架構和數據結構的設計,他的數據寫入並不是直接寫入文件,當然HDFS不支持隨機寫入,而是先寫入被稱作MemStore的內存,然後再異步刷寫至HDFS,等於是將隨機寫入轉換成了順序寫,所以大多時候寫入速度高並且很穩定。

而讀數據快,是使用字典有序的主鍵RowKey通過Zookeeper先定位到數據可能所在的RegionServer,然後先查找RegionServer的讀緩存BlockCache,如果沒找到會再查MemStore,只有這兩個地方都找不到時,才會加載HDFS中的內容,但因為其使用了LSM樹型結構,所以讀取耗時一般也不長。還有就是,HBase還可以使用布隆過濾器通過判存提高查詢速度。

HBase的缺點一個是運維成本相對較高,像compact、split、flush等問題處理起來都是比較棘手的,都需要不定期的投入時間做調優。還有個缺點是延遲不穩定,影響原因除了其copmact、flush外還有JVM的GC以及緩存命中情況。

基於Spark、NoSQL的實時數據處理實踐

ScyllaDB算是個新秀,可以與Cassandra對比了解,其實它就是用C++重寫的Cassandra,客戶端完全與Cassandra兼容,其官網Benchmark對標的也是Cassandra,性能有10倍以上的提升,單節點也可以每秒可以處理100萬TPS,整體性能還是比較喜人的。與HBase、Cassandra一樣也有可擴展、高吞吐、高容錯的特點,另外他的延遲也比較低,並且比較穩定。

他和Cassandra與HBase都可以以做到CAP理論裡的P,即保證分區容忍性,也就是在某個或者某些節點出現網絡故障或者系統故障時候,不會影響到整個DataBase的使用。而他倆與HBase不一樣的一個地方在於分區容忍性包證的情況下,一致性與高可用的取捨,也就是CAP理論裡,在P一定時C與A的選擇。HBase選擇的是C,即強一致性,比如在region failover 及後續工作完成前,涉及的region的數據是不能讀取的,而ScyllaDB、Cassandra選擇的A,即高可用的,但有些情況下數據可能會不一致。所以,選型時需要根據業務場景來定。

ScyllaDB的劣勢也比較明顯,就是項目比較新,Bug和使用的坑比較多, 我在這裡就不一一去說了。

基於Spark、NoSQL的實時數據處理實踐

前面分別簡單介紹了選定的技術組件,及他們的優缺點,最終項目整體架構如上圖所示,數據流向用灰色箭頭代表,數據採集和收集都與離線計算一樣,不同的是在Spark Streaming從Kafka消費數據時,會同時實時從ScyllaDB讀取wifi、基站定位庫的數據參與位置補充的計算,然後將處理的結果數據寫入HBase。再往下類似Lambda架構,會對HBase中的數據離線做進一步的處理,然後再將數據離線通過Bulkload方式寫入HBase,關於其中的Bitmap應用,後邊再聊。

架構右邊部分是服務相關的,首先是中間件,主要屏蔽了異構數據庫對應用層服務的影響,再往上是規則引擎服務,因為我們上線在SDMK的應用服務有100多個,導致服務管理成本很高,並且也不利於物理資源的合理運用,所以上線了規則引擎服務,將所有服務的業務邏輯都通過規則表達,這樣上線新服務就不需要重新申請服務器,只需要添加一條規則即可。等於是就將一百多個服務轉換成了一個服務,當規則引擎負載較高時或者大幅降低後,可以很方便的進行資源的擴充和減少。SDMK是TalkingData研發的類似淘寶的交易平臺,公司內、外的數據服務、數據模型都可以像商品一樣在上面進行售賣。


分享到:


相關文章: