Spark Streaming|Spark,從入門到精通

歡迎閱讀美圖數據技術團隊的「Spark,從入門到精通」系列文章,本系列文章將由淺入深為大家介紹 Spark,從框架入門到底層架構的實現,相信總有一種姿勢適合你,歡迎大家持續關注:)

注:本文節選自「酷玩 Spark」開源項目,原文地址:https://github.com/lw-lin/CoolplaySpark

Spark Streaming 是批處理的流式實時計算框架,支持從多種數據源獲取數據,如 Kafka、TCP sockets、文件系統等。它可以使用諸如 map、reduce、join 等高級函數進行復雜算法的處理,最後還可以將處理結果存儲到文件系統,數據庫等。

Spark Streaming 有三個特點:

  • 基於 Spark Core Api,因此其能夠與 Spark 中的其他模塊保持良好的兼容性,為編程提供了良好的可擴展性;
  • 粗粒度的準實時處理框架,一次讀取完成,或異步讀完數據之後,再處理數據,且其計算可基於大內存進行,因而具有較高的吞吐量;
  • 採用統一的 DAG 調度以及 RDD,對實時計算有很好的容錯支持;


運行原理


Spark Streaming|Spark,從入門到精通

圖 1

如圖 1 所示是 Spark 的整體架構圖,它主要分為四個模塊:

  • 靜態的 RDD DAG 模版,表示處理邏輯;
  • 動態的工作控制器,將連續的 streaming data 切分為數據片段,並按照模板複製出新的 RDD DAG 的實例,對數據片段進行處理;
  • 原始數據的產生和導入
  • 對長時運行任務的保障,包括輸入數據的失效後的重構和處理任務的失敗後的重調。


DAG 靜態定義

DAG 靜態定義是將整個計算邏輯描述為一個 RDD DAG 的「模版」,在後面 Job 動態生成的時候,針對每個 batch,Spark Streaming 都將根據這個「模板」生成一個 RDD DAG 的實例。

Spark Streaming|Spark,從入門到精通

圖 2

接下來我們瞭解下 RDD 和 DStream 的關係。DStream 維護了對每個產出的 RDD 實例的引用,如圖 2 所示,DStream 在 3 個 batch 裡分別實例化了 3 個 RDD, a[1]、a[2]、a[3],然後 DStream 就保留了 batch 所產出的 RDD 的哈希表。

我們在考慮的時候,可以認為 RDD 加上 batch 維度就是 DStream,DStream 去掉 batch 維度就是 RDD。Spark 定義靜態的計算邏輯後,通過動態的工作控制來調度。

Job 動態生成

在 Spark Streaming 程序的入口我們都會定義一個 batchDuration,即每隔固定時間就比照靜態的 DStreamGraph 來動態生成一個 RDD DAG 實例。在 Spark Streaming 內整體負責動態作業調度的具體類是 JobScheduler,由 start() 運行。

JobScheduler 有兩個非常重要的成員:JobGenerator 和 ReceiverTracker。JobScheduler 將每個 batch 的 RDD DAG 具體生成工作委託給 JobGenerator,而將源頭輸入數據的記錄工作委託給 ReceiverTracker。

Spark Streaming|Spark,從入門到精通

JobGenerator 維護了一個定時器,週期就是上文提到的 batchDuration,定時為每個 batch 生成 RDD DAG 的實例,其中每次 RDD DAG 實際生成包含 5 個步驟:

  • 要求 ReceiverTracker 將目前已收到的數據進行一次分配,即將上個批次切分後的數據,切分到到本次新的批次裡;
  • 要求 DStreamGraph 複製出一套新的 RDD DAG 的實例, DStreamGraph 將要求圖裡的尾 DStream 節點生成具體的 RDD 實例,並遞歸的調用尾 DStream 的上游 DStream 節點……以此遍歷整個 DStreamGraph,遍歷結束也就正好生成了 RDD DAG 的實例;
  • 獲取第 1 步 ReceiverTracker 分配到本 batch 的源頭數據的 meta 信息;
  • 將第 2 步生成的本 batch 的 RDD DAG,和第 3 步獲取到的 meta 信息,一同提交給 JobScheduler 異步執行;
  • 只要提交結束(不管是否已開始異步執行),就馬上對整個系統的當前運行狀態做一個 checkpoint。


數據產生與導入

DStream 的子類 ReceiverInputDStream 在某個 batch 裡實例化 RDD,通過 Receiver 為這個 RDD 生產數據。Spark Streaming 在程序剛開始運行時:

Spark Streaming|Spark,從入門到精通


  • 由 Receiver 的總指揮 ReceiverTracker 分發多個 job,到多個 executor 上分別啟動 ReceiverSupervisor 實例;
  • 每個 ReceiverSupervisor 啟動後將馬上生成一個用戶提供的 Receiver 實現的實例並在 Receiver 實例生成後調用 Receiver.onStart(),這時 Receiver 啟動工作已經運行完畢。


Spark Streaming|Spark,從入門到精通


  • Receiver 在 onStart() 啟動後,就將持續不斷地接收外界數據,並持續交給 ReceiverSupervisor 進行數據轉儲;
  • ReceiverSupervisor 持續不斷地接收到 Receiver 轉來的數據,如果數據很細小,就需要 BlockGenerator 攢多條數據成一塊(4a)、然後再成塊存儲(4b 或 4c);反之就不用攢,直接成塊存儲(4b 或 4c);
  • 每次成塊在 executor 存儲完畢後,ReceiverSupervisor 就會及時上報塊數據的 meta 信息給 driver 端的 ReceiverTracker,這裡的 meta 信息包括數據的標識 id、數據的位置、數據的條數、數據的大小等信息;
  • ReceiverTracker 再將收到的塊數據 meta 信息直接轉給自己的成員 ReceivedBlockTracker,由 ReceivedBlockTracker 專門管理收到的塊數據 meta 信息。

後續在 driver 端,就由 ReceiverInputDStream 在每個 batch 去檢查 ReceiverTracker 收到的塊數據 meta 信息,界定哪些新數據需要在本 batch 內處理,然後生成相應的 RDD 實例去處理這些塊數據。

舉個例子

import org.apache.spark.streaming._
import org.apache.spark.SparkConf
object example{
def main(args:Array[String]):Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" ")) // DStream transformation
val pairs = words.map(word => (word, 1)) // DStream transformation
val wordCounts = pairs.reduceByKey(_ + _) // DStream transformation
wordCounts.print() // DStream output
ssc.start()
ssc.awaitTermination()
}
}

如以上代碼所示:

  • 啟動 Spark Streamingg 實例後將 batchDuration 設置為 1 秒;
  • ssc.socketTextStream() 將創建一個 SocketInputDStream,這個 InputDStream 的 SocketReceiver 將監聽本機 9999 端口;
  • 接下來幾行利用 DStream transformation 構造出了 lines -> words -> pairs -> wordCounts -> .print() 從lines 到 wordCounts print()的一個 DStreamGraph;
  • 到目前只是是定義好了產生數據的 SocketReceiver 及 DStreamGraph,這些都是靜態的;
  • 下面這行 start() 將在幕後啟動 JobScheduler,進而啟動 JobGenerator 和 ReceiverTracker,其中 JobGenerator 開始不斷的生成一個一個 batch,ReceiverTracker 創建和啟動 Receiver;
  • 然後用戶 code 主線程就 block 在awaitTermination了,block 的效果就是,後臺的 JobScheduler 開始不斷的生成一個一個 batch,也就是在這裡,我們前面靜態定義的 DStreamGraph 的 print(),才一次一次被在 RDD 實例上調用,一次一次打印出當前 batch 的結果;


長時容錯


Spark Streaming|Spark,從入門到精通

首先看 executor 端,在 executor 端 ReceiverSupervisor 和 Receiver 失效後直接重啟即可,關鍵點是保障收到的塊數據的安全,保障了源頭塊數據就能夠保障 RDD DAG (Spark Core 的 lineage)重做。

Spark Streaming 對源頭塊數據的保障,分為 4 個層次,全面、相互補充,又可根據不同場景靈活設置:

  • 熱備:熱備是指在存儲塊數據時,將其存儲到本 executor、並同時 replicate 到另外一個 executor 上去。這樣在一個 replica 失效後,可以立刻無感知切換到另一份 replica 進行計算。實現方式是,在實現自己的 Receiver 時,即指定一下 StorageLevel 為 MEMORY_ONLY_2 或 MEMORY_AND_DISK_2 就可以了。
  • *1.5.2 update 這已經是默認了
  • 冷備:冷備是每次存儲塊數據前,先把塊數據作為 log 寫出到 WriteAheadLog 裡,再存儲到本 executor。executor 失效時,就由另外的 executor 去讀 WAL,再重做 log 來恢復塊數據。WAL 通常寫到可靠存儲如 HDFS 上,所以恢復時可能需要一段 recover time。
  • 重放:如果上游支持重放,比如 Apache Kafka,那麼就可以選擇不用熱備或者冷備來另外存儲數據了,而是在失效時換一個 executor 進行數據重放即可。
  • 忽略:最後,如果應用的實時性需求大於準確性,那麼一塊數據丟失後我們也可以選擇忽略、不恢復失效的源頭數據。


上文曾提到塊數據的 meta 信息上報到 ReceiverTracker,然後交給 ReceivedBlockTracker 做具體的管理。ReceivedBlockTracker 也採用 WAL 冷備方式進行備份,在 driver 失效後,由新的 ReceivedBlockTracker 讀取 WAL 並恢復 block 的 meta 信息。

另外,需要定時對 DStreamGraph 和 JobScheduler 做 Checkpoint,來記錄整個 DStreamGraph 的變化、和每個 batch 的 job 的完成情況。

注意到這裡採用的是完整 checkpoint 的方式,和之前的 WAL 的方式都不一樣。Checkpoint 通常也是落地到可靠存儲如 HDFS。Checkpoint 發起的間隔默認的是和 batchDuration 一致;即每次 batch 發起、提交了需要運行的 job 後就做 Checkpoint,另外在 job 完成了更新任務狀態的時候再次做一下 Checkpoint。

這樣一來,在 driver 失效並恢復後,可以讀取最近一次的 Checkpoint 來恢復作業的 DStreamGraph 和 job 的運行及完成狀態。

Spark Streaming 窗口操作

Spark Streaming|Spark,從入門到精通

Structured Streaming

Structured Streaming 是一種基於 Spark SQL 引擎構建的可擴展且容錯的流處理引擎,它可以以靜態數據表示批量計算的方式來表達流式計算。 Spark SQL 引擎將隨著 streaming data 持續到達而增量地持續地運行,並更新最終結果。

Spark Streaming|Spark,從入門到精通

StreamExecution 的初始狀態

值得注意的是,Structured Streaming 也是先純定義、再觸發執行的模式。前面大部分代碼是純定義 Dataset/DataFrame 的產生、變換和寫出,後面位置再真正 start 一個新線程去觸發執行之前的定義。在新的執行線程裡我們需要持續地去發現新數據,進而持續地查詢最新計算結果至寫出。


Spark Streaming|Spark,從入門到精通

這些 DataFrame的產生、變換和寫出的信息就對應保存在 StreamExecution非常重要的 3 個成員變量中:

  • sources: streaming data 的產生端(如 kafka等);
  • logicalPlan: DataFrame/Dataset 的一系列變換,即計算邏輯;
  • sink: 最終結果寫出的接收端(比如 file system 等)。


Structured Streaming 持續查詢

StreamExecution 通過 Source.getOffset() 獲取最新的 offsets,即最新的數據進度,將 offsets 寫入到 offsetLog 裡,將來可用作故障恢復用。在 3a 將預先定義好的邏輯(即 logicalPlan 成員變量)製作一個副本出來,3b 給定剛剛取到的 offsets,通過 Source.getBatch(offsets) 獲取本執行新收到的數據的 DataFrame 表示。經過這兩步,構造完成的 LogicalPlan 就是針對本執行新收到的數據的 DataFrame 整個處理邏輯。


Spark Streaming|Spark,從入門到精通

接著將表示計算結果的 DataFrame 交給 Sink,6a 通過 Source.commit() 告知 Source 數據已經完整處理結束,6b 將本次執行的批次 id 寫入到 batchCommitLog 裡。

StreamExecution 增量持續查詢

Structured Streaming 在編程模型上暴露給用戶的是每次持續查詢看做面對全量數據,所以每次執行的結果是針對全量數據進行計算的結果,但是在實際執行過程中,由於全量數據會越攢越多,每次對全量數據進行計算的代價和消耗會越來越大。


Spark Streaming|Spark,從入門到精通



因此 Structured Streaming 引入全局範圍、高可用的 StateStore 轉全量為增量,即在每次執行時先從 StateStore 裡 restore 出上次執行後的狀態,再加入本執行的新數據進行計算,如果有狀態改變,將把改變的狀態重新 save 到 StateStore 裡。

所以 Structured Streaming 在具體實現上轉換為增量的持續查詢。

故障恢復

由於 exectutor 節點的故障可由 Spark 框架本身很好的 handle,不引起可用性問題,因此只討論 driver 故障恢復。如果在某個執行過程中發生 driver 故障,那麼重新起來的 StreamExecution 讀取 WAL offsetlog 恢復出最新的 offsets ,並讀取 batchCommitLog 決定是否需要重做最近一個批次。

事件時間

當我們有一系列到達的記錄時,首先對時間列 timestamp 做長度為10m,滑動為5m 的 window() 操作。

Spark Streaming|Spark,從入門到精通

如圖右上角的虛框部分,當達到一條記錄 12:22|dog 時,會將 12:22 歸入兩個窗口 12:15-12:25、12:20-12:30,所以產生兩條記錄:12:15-12:25|dog、12:20-12:30|dog,所以這裡 window() 操作的本質是 explode(),可由一條數據產生多條數據。

接著對 window() 操作的結果,以 window 列和 word 列為 key,做 groupBy() 操作。這個操作的聚合過程是增量的最後得到一個有 window、 word、count 三列的狀態集。

val windowedCounts = words
.withWatermark("timestamp", "10 minutes") // 注意這裡的 watermark 設置!
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()

對於數據延遲通過 withWatermark("timestamp", "10 minutes") 告訴 Structured Streaming,以 timestamp 列的最大值為錨點,往前推 10min 以前的數據不會再接收。

Spark Streaming|Spark,從入門到精通


  • 在 12:20 這個批次結束後,錨點變成了 12:20|dog,owl 這條記錄的 event time 12:20 ,watermark 變成了 12:20 - 10min = 12:10;
  • 在 12:30 批次結束時,即知道 event time 12:10 以前的數據不再收到了,因而 window 12:00-12:10 的結果也不會再被更新,即可以安全地輸出結果 12:00-12:10|cat|2;
  • 在結果 12:00-12:10|cat|2 輸出以後,State 中也不再保存 window 12:00-12:10 的相關信息,即 State Store 中的此條狀態得到了清理。

接下來看 structured streaming 的輸出模式,complete 輸出模式如同上面的流程,接著主要講另外兩種輸出模式:append 和 update。

Spark Streaming|Spark,從入門到精通

Append 的語義將保證一旦輸出了某條 key,未來就不會再輸出同一個 key。所以,在上圖 12:10 這個批次直接輸出 12:00-12:10|cat|1, 12:05-12:15|cat|1 將是錯誤的,因為在 12:20 將結果更新為了12:00-12:10|cat|2,但是 Append 模式下卻不會再次輸出 12:00-12:10|cat|2,因為前面輸出過了同一條 key 12:00-12:10|cat 的結果12:00-12:10|cat|1。

為了解決這個問題,在 Append 模式下 Structured Streaming 需要知道某一條 key 的結果什麼時候不會再更新了,當確認結果不會再更新的時候就可以將結果進行輸出。

Spark Streaming|Spark,從入門到精通

如上圖所示,如果我們確定 12:30 這個批次以後不會再有對 12:00-12:10 這個 window 的更新,那麼我們就可以把 12:00-12:10 的結果在 12:30 這個批次輸出,並且也會保證後面的批次不會再輸出 12:00-12:10 的 window 的結果,維護了 Append 模式的語義。

Update 模式是在 Spark 2.1.1 及以後版本獲得正式支持。

Spark Streaming|Spark,從入門到精通

如上圖所示,在 Update 模式中,只有本執行批次 State 中被更新了的條目會被輸出:

  • 在 12:10 這個執行批次,State 中全部 2 條都是新增的(因而也都是被更新了的),所以輸出全部 2 條;
  • 在 12:20 這個執行批次,State 中 2 條是被更新了的、 4 條都是新增的(因而也都是被更新了的),所以輸出全部 6 條;
  • 在 12:30 這個執行批次,State 中 4 條是被更新了的,所以輸出 4 條。這些需要特別注意的一點是,如 Append 模式一樣,本執行批次中由於(通過 watermark 機制)確認 12:00-12:10 這個 window 不會再被更新,因而將其從 State 中去除,但沒有因此產生輸出。


分享到:


相關文章: