大數據技術分享:Spark Streaming 技術點匯總

Spark Streaming支持實時數據流的可擴展(Scalable)、高吞吐(high-throughput)、容錯(fault-tolerant)的流處理(stream processing)。

大數據技術分享:Spark Streaming 技術點彙總

特性如下:

• 可線性伸縮至超過數百個節點;

• 實現亞秒級延遲處理;

• 可與 Spark 批處理和交互式處理無縫集成;

• 提供簡單的API實現複雜算法;

• 更多的流方式支持,包括 Kafka、Flume、Kinesis、Twitter、ZeroMQ 等。


001、原理

Spark 在接收到實時輸入數據流後,將數據劃分成批次(divides the data into batches),然後轉給 Spark Engine 處理,按批次生成最後的結果流(generate the final stream of results in batches)。

大數據技術分享:Spark Streaming 技術點彙總


002、API

DStream:

DStream(Discretized Stream,離散流)是 Spark Stream 提供的高級抽象連續數據流。

組成:一個 DStream 可看作一個 RDDs 序列。

核心思想:將計算作為一系列較小時間間隔的、狀態無關的、確定批次的任務,每個時間間隔內接收的輸入數據被可靠存儲在集群中,作為一個輸入數據集。

大數據技術分享:Spark Streaming 技術點彙總

特性:一個高層次的函數式編程 API、強一致性以及高校的故障恢復。

應用程序模板:

模板1

大數據技術分享:Spark Streaming 技術點彙總

模板2

大數據技術分享:Spark Streaming 技術點彙總

WordCount示例


大數據技術分享:Spark Streaming 技術點彙總

Input DStream:

Input DStream 是一種從流式數據源獲取原始數據流的 DStream,分為基本輸入源(文件系統、Socket、Akka Actor、自定義數據源)和高級輸入源(Kafka、Flume等)。

Receiver:

每個 Input DStream(文件流除外)都會對應一個單一的 Receiver對象,負責從數據源接收數據並存入 Spark 內存進行處理。應用程序中可創建多個 Input DStream 並行接收多個數據流。

每個 Receiver 是一個長期運行在Worker或者 Executor 上的 Task,所以會佔用該應用程序的一個核(core)。如果分配給 Spark Streaming 應用程序的核數小於或等於 Input DStream 個數(即Receiver個數),則只能接收數據,卻沒有能力全部處理(文件流除外,因為無需Receiver)。

Spark Streaming 已封裝各種數據源,需要時參考官方文檔。

Transformation Operation

常用Transformation

大數據技術分享:Spark Streaming 技術點彙總

大數據技術分享:Spark Streaming 技術點彙總

updateStateByKey(func)

updateStateByKey可對DStream中的數據按key做reduce,然後對各批次數據累加

WordCount的updateStateByKey版本

大數據技術分享:Spark Streaming 技術點彙總

transform(func)

通過對原 DStream 的每個 RDD 應用轉換函數,創建一個新的 DStream。

官方文檔代碼舉例

大數據技術分享:Spark Streaming 技術點彙總

Window operations

窗口操作:基於 window 對數據 transformation(個人認為與Storm的tick相似,但功能更強大)。

參數:窗口長度(window length)和滑動時間間隔(slide interval)必須是源DStream 批次間隔的倍數。

舉例說明:窗口長度為3,滑動時間間隔為2;上一行是原始 DStream,下一行是窗口化的 DStream。

大數據技術分享:Spark Streaming 技術點彙總

常見 window operation

大數據技術分享:Spark Streaming 技術點彙總

大數據技術分享:Spark Streaming 技術點彙總

官方文檔代碼舉例

大數據技術分享:Spark Streaming 技術點彙總

join(otherStream, [numTasks])

連接數據流

官方文檔代碼舉例1

大數據技術分享:Spark Streaming 技術點彙總

官方文檔代碼舉例2

大數據技術分享:Spark Streaming 技術點彙總

Output Operation

大數據技術分享:Spark Streaming 技術點彙總

緩存與持久化:

通過 persist()將 DStream 中每個 RDD 存儲在內存。

Window operations 會自動持久化在內存,無需顯示調用 persist()。

通過網絡接收的數據流(如Kafka、Flume、Socket、ZeroMQ、RocketMQ等)執行 persist()時,默認在兩個節點上持久化序列化後的數據,實現容錯。

Checkpoint:

用途:Spark 基於容錯存儲系統(如HDFS、S3)進行故障恢復。

分類:

元數據檢查點:保存流式計算信息用於 Driver 運行節點的故障恢復,包括創建應用程序的配置、應用程序定義的 DStream operations、已入隊但未完成的批次。

數據檢查點:保存生成的 RDD。由於 stateful transformation 需要合併多個批次的數據,即生成的 RDD 依賴於前幾個批次 RDD 的數據(dependency chain),為縮短 dependency chain 從而減少故障恢復時間,需將中間 RDD 定期保存至可靠存儲(如HDFS)。

使用時機:

Stateful transformation:updateStateByKey()以及 window operations。

需要 Driver 故障恢復的應用程序。


003、使用方法

Stateful transformation

大數據技術分享:Spark Streaming 技術點彙總

需要 Driver 故障恢復的應用程序(以WordCount舉例):如果 checkpoint 目錄存在,則根據 checkpoint 數據創建新 StreamingContext;否則(如首次運行)新建 StreamingContext。

大數據技術分享:Spark Streaming 技術點彙總

checkpoint 時間間隔

方法:

大數據技術分享:Spark Streaming 技術點彙總

原則:一般設置為滑動時間間隔的5-10倍。

分析:checkpoint 會增加存儲開銷、增加批次處理時間。當批次間隔較小(如1秒)時,checkpoint 可能會減小 operation 吞吐量;反之,checkpoint 時間間隔較大會導致 lineage 和 task 數量增長。


004、性能調優

降低批次處理時間:

數據接收並行度

增加 DStream:接收網絡數據(如Kafka、Flume、Socket等)時會對數據反序列化再存儲在 Spark,由於一個 DStream 只有 Receiver 對象,如果成為瓶頸可考慮增加 DStream。

大數據技術分享:Spark Streaming 技術點彙總

設置“spark.streaming.blockInterval”參數:接收的數據被存儲在 Spark 內存前,會被合併成 block,而 block 數量決定了Task數量;舉例,當批次時間間隔為2秒且 block 時間間隔為200毫秒時,Task 數量約為10;如果Task數量過低,則浪費了 CPU 資源;推薦的最小block時間間隔為50毫秒。

顯式對 Input DStream 重新分區:在進行更深層次處理前,先對輸入數據重新分區。

大數據技術分享:Spark Streaming 技術點彙總

數據處理並行度:reduceByKey、reduceByKeyAndWindow 等 operation 可通過設置“spark.default.parallelism”參數或顯式設置並行度方法參數控制。

數據序列化:可配置更高效的 Kryo 序列化。

設置合理批次時間間隔

原則:處理數據的速度應大於或等於數據輸入的速度,即批次處理時間大於或等於批次時間間隔。

方法:

先設置批次時間間隔為5-10秒以降低數據輸入速度;

再通過查看 log4j 日誌中的“Total delay”,逐步調整批次時間間隔,保證“Total delay”小於批次時間間隔。

內存調優

持久化級別:開啟壓縮,設置參數“spark.rdd.compress”。

GC策略:在Driver和Executor上開啟CMS。


分享到:


相關文章: