Spark核心API的擴展Spark Streaming

一、什麼是Spark Streaming

Spark Streaming是Spark核心API的擴展,使得實時數據流的可伸縮、高吞度量、容錯流方式處理成為可能。數據可從多種數據源攝入比如Kafka、Flume、Twitter、ZeroMQ、Kinesis或TCP socket,可以使用高階函數像map、reduce、join和window等複雜算法。最後處理的數據可以push到文件系統、數據庫和儀表盤。實際上,也可以對數據流應用spark的機器學習和圖計算。

Spark核心API的擴展Spark Streaming

Streaming的內部工作方式如下圖。Streaming接受實時輸入的數據流,然後將數據分割成批次,然後由Spark引擎進行處理,生成按批次最終的結果流。

Spark核心API的擴展Spark Streaming

Spark Streaming提供高級抽象稱之為離散流或DStream,表示連續的數據流。DStream可以通過Kafka、Flume等數據源的輸入數據流來創建,也可以在其他的DStream上通過高級操作來創建。在內部,DStream代表著一系列的RDD。

可以使用java、scala、python語言編寫Spark Streaming程序。

二、基本概念

SparkStream可通過maven訪問,編寫SparkStreaming程序需要在添加maven依賴


org.apache.spark
spark-streaming_2.10
1.6.0

從沒有包含在SparkStreaming核心API的數據源中吸取數據需要添加相應依賴。如下所示:

Spark核心API的擴展Spark Streaming

初始化StreamingContext

要初始化Streaming程序,必須先創建StreamingContext對象,他是所有Spark Streaming的入口點。

import org.apache.spark._

import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(..).setMaster(..)

val ssc = new StreamingContext(conf, Seconds(1))

StreamingContext在內部會創建SparkContext對象,並可使用ssc.sparkContext訪問到。

批次的間隔時間必須依據App的延遲要求和集群的可用資源為基礎進行設置。

也可以通過現有的SparkContext創建StreamingContext

val sc = ... val ssc = new StreamingContext(sc,Seconds(1))

定義context之後,必須做如下操作:

1.通過創建DStream定義輸入源

2.通過對DStream的變換和輸出操作定義流計算

3.使用ssc.start()開始接受數據並進行處理。

4.使用ssc.awaitTernimation()等待處理

5.ssc.stop()手動停止處理。

記憶要點:

1.context一旦啟動,不能設置或添加流計算

2.一旦停止,不能重啟

3.在JVM中同一時刻只能有一個激活的context

4.stop()方法也會停止SparkContext,如果只停止StreamContext的話,使用sc.stop(false)參數

5.可以重複使用SparkContext創建多個StreamingContext,但是要確保前一個StreamingContext已經停止。

Discretized Streams(DStream)

SparkStreaming提供的基本抽象就是離散流。代表連續的數據流。在內部,DStream代表持續的RDD,也是Spark不可變數據集的抽象。每個RDD包含特定間隔的數據,如圖

Spark核心API的擴展Spark Streaming

DStream的任何操作都會翻譯成RDD的操作。

Spark核心API的擴展Spark Streaming

輸入DStream和接收者

輸入DStream代表來自於數據源的輸入數據的Stream。lines就是input DStream,每個input DStream都和一個Receiver對象關聯,從數據源接收數據並存儲在Spark內存中進行處理。

Spark Streaming提供了兩種內置的流來源。

1.基本來源:在StreamingContext的API直接使用。例如文件系統、Socket連接和AKKA actors。

2.高級來源:象Kafka、Flume、Kinesis、Twitter等通過額外額工具類可用,這些要求關聯依賴。

如果要並行接受多個數據流,可以創建多個接收者,他們將會同時接收多個數據流。注意Spark的worker/executor是耗時task,因此會佔用分配的一個內核給Streaming應用。需要記住Spark Streaming需要分配足夠多的內核或線程來處理接收的數據以及運行接收者。

記憶要點:

運行Streaming時,不要使用local或loacal[1],這樣一來只有一個線程運行task。如果使用的DStream是基於Receiver的,比如Socket、Kafka,需要有一個線程運行Receiver,就沒有其他線程處理數據了。

Basic Source

除了Socket之外,Streaming也提供從File和Akka的Acttor創建輸入源。

· FileStream

ssc.fileStream[KClass,VClass,InputClass](dir)

Streaming會監控目錄dir並處理生成的文件,不支持嵌套。

注意事項:

1.文件格式相同

2.必須以移動或重命名方式創建文件

3.一旦移動,不能修改,否則新增數據不能處理。

· FileStream

不需要Receiver,因此不需要分配內核。Python不支持fileStream。

· Stream base on Custom Actors

ssc.actorStream(..)

· RDD隊列流

ssc.queueStream(queue).用於測試目的,每個RDD放入到隊列中可以看成一個批次,象流一樣進行處理。

高級Source

使用單獨的類庫創建DStream,例如Twitter

1.添加依賴spark-streaming-twitter_2.10到maven

2.導入類TwitterUtils TwitterUtil.createStream()

3. 部署 生成jar包,包含所有依賴,部署應用。

注意:這些高級源無法在spark shell使用。如果使用,下載相應的依賴jar並放置到classpath下。

自定義源

實現Receiver接口。

Receiver可靠性

基於可靠性有兩種數據源。象Kafka和Flume等源允許傳輸的數據進行確認。如果系統從這些可靠的源接收到的數據能夠確認正確的接收了數據的話,就可以確定無論何種原因數據沒有丟失。這就會導致兩種接收者:

1.可靠接收者當接收到數據並以副本方式在Spark中進行存儲時,可靠接收者會正確發送確認信息給可靠源。

2.不可靠接收者不可靠接收者不會發送確認信息給源。這可以應用給那些不支持確認的源,或者不需要確認信息的可靠源。


大家可以持續關注小編,我將盡其所能的為大家提供技術性實踐資料、文章、視頻。

Spark核心API的擴展Spark Streaming

感謝大家的支持!


分享到:


相關文章: