十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

1.Flink架構及特性分析

Flink是個相當早的項目,開始於2008年,但只在最近才得到注意。Flink是原生的流處理系統,提供high level的API。Flink也提供 API來像Spark一樣進行批處理,但兩者處理的基礎是完全不同的。Flink把批處理當作流處理中的一種特殊情況。在Flink中,所有 的數據都看作流,是一種很好的抽象,因為這更接近於現實世界。

1.1 基本架構

下面我們介紹下Flink的基本架構,Flink系統的架構如Spark類似,是一個基於Master-Slave風格的架構。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

當 Flink 集群啟動後,首先會啟動一個 JobManger 和一個或多個的 TaskManager。由 Client 提交任務給 JobManager, JobManager 再調度任務到各個 TaskManager 去執行,然後 TaskManager 將心跳和統計信息彙報給 JobManager。 TaskManager 之間以流的形式進行數據的傳輸。上述三者均為獨立的 JVM 進程。

Client 為提交 Job 的客戶端,可以是運行在任何機器上(如 JobManager 環境連通即可)。提交 Job 後,Client 可以結束進程 (Streaming的任務),也可以不結束並等待結果返回。

JobManager 主要負責調度 Job 並協調 Task 做 checkpoint,職責上很像 Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包 等資源後,會生成優化後的執行計劃,並以 Task 的單元調度到各個 TaskManager 去執行。

TaskManager 在啟動的時候就設置好了槽位數(Slot),每個 slot 能啟動一個 Task,Task 為線程。從 JobManager 處接收需要 部署的 Task,部署啟動後,與自己的上游建立 Netty 連接,接收數據並處理。

JobManager

JobManager是Flink系統的協調者,它負責接收Flink Job,調度組成Job的多個Task的執行。同時,JobManager還負責收集Job 的狀態信息,並管理Flink集群中從節點TaskManager。JobManager所負責的各項管理功能,它接收到並處理的事件主要包括:

RegisterTaskManager

在Flink集群啟動的時候,TaskManager會向JobManager註冊,如果註冊成功,則JobManager會向TaskManager回覆消息 AcknowledgeRegistration。

SubmitJob

Flink程序內部通過Client向JobManager提交Flink Job,其中在消息SubmitJob中以JobGraph形式描述了Job的基本信息。

CancelJob

請求取消一個Flink Job的執行,CancelJob消息中包含了Job的ID,如果成功則返回消息CancellationSuccess,失敗則返回消息 CancellationFailure。

UpdateTaskExecutionState

TaskManager會向JobManager請求更新ExecutionGraph中的ExecutionVertex的狀態信息,更新成功則返回true。

RequestNextInputSplit

運行在TaskManager上面的Task,請求獲取下一個要處理的輸入Split,成功則返回NextInputSplit。

JobStatusChanged

ExecutionGraph向JobManager發送該消息,用來表示Flink Job的狀態發生的變化,例如:RUNNING、CANCELING、 FINISHED等。

TaskManager

TaskManager也是一個Actor,它是實際負責執行計算的Worker,在其上執行Flink Job的一組Task。每個TaskManager負責管理 其所在節點上的資源信息,如內存、磁盤、網絡,在啟動的時候將資源的狀態向JobManager彙報。TaskManager端可以分成兩個 階段:

註冊階段

TaskManager會向JobManager註冊,發送RegisterTaskManager消息,等待JobManager返回AcknowledgeRegistration,然 後TaskManager就可以進行初始化過程。

可操作階段

該階段TaskManager可以接收並處理與Task有關的消息,如SubmitTask、CancelTask、FailTask。如果TaskManager無法連接 到JobManager,這是TaskManager就失去了與JobManager的聯繫,會自動進入“註冊階段”,只有完成註冊才能繼續處理Task 相關的消息。

Client

當用戶提交一個Flink程序時,會首先創建一個Client,該Client首先會對用戶提交的Flink程序進行預處理,並提交到Flink集群中處 理,所以Client需要從用戶提交的Flink程序配置中獲取JobManager的地址,並建立到JobManager的連接,將Flink Job提交給 JobManager。Client會將用戶提交的Flink程序組裝一個JobGraph, 並且是以JobGraph的形式提交的。一個JobGraph是一個 Flink Dataflow,它由多個JobVertex組成的DAG。其中,一個JobGraph包含了一個Flink程序的如下信息:JobID、Job名稱、配 置信息、一組JobVertex等。

1.2 基於Yarn層面的架構


十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

基於yarn層面的架構類似spark on yarn模式,都是由Client提交App到RM上面去運行,然後RM分配第一個container去運行 AM,然後由AM去負責資源的監督和管理。需要說明的是,Flink的yarn模式更加類似spark on yarn的cluster模式,在cluster模式 中,dirver將作為AM中的一個線程去運行,在Flink on yarn模式也是會將JobManager啟動在container裡面,去做個driver類似 的task調度和分配,YARN AM與Flink JobManager在同一個Container中,這樣AM可以知道Flink JobManager的地址,從而 AM可以申請Container去啟動Flink TaskManager。待Flink成功運行在YARN集群上,Flink YARN Client就可以提交Flink Job到 Flink JobManager,並進行後續的映射、調度和計算處理。


1.3 組件棧

Flink是一個分層架構的系統,每一層所包含的組件都提供了特定的抽象,用來服務於上層組件。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

Deployment層

該層主要涉及了Flink的部署模式,Flink支持多種部署模式:本地、集群(Standalone/YARN)、雲(GCE/EC2)。Standalone 部署模式與Spark類似,這裡,我們看一下Flink on YARN的部署模式

Runtime層

Runtime層提供了支持Flink計算的全部核心實現,比如:支持分佈式Stream處理、JobGraph到ExecutionGraph的映射、調度等 等,為上層API層提供基礎服務。

API層

API層主要實現了面向無界Stream的流處理和麵向Batch的批處理API,其中面向流處理對應DataStream API,面向批處理對應DataSet API。

Libraries層

該層也可以稱為Flink應用框架層,根據API層的劃分,在API層之上構建的滿足特定應用的實現計算框架,也分別對應於面向流處理 和麵向批處理兩類。面向流處理支持:CEP(複雜事件處理)、基於SQL-like的操作(基於Table的關係操作);面向批處理支持: FlinkML(機器學習庫)、Gelly(圖處理)。

從官網中我們可以看到,對於Flink一個最重要的設計就是Batch和Streaming共同使用同一個處理引擎,批處理應用可以以一種特 殊的流處理應用高效地運行。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

這裡面會有一個問題,就是Batch和Streaming是如何使用同一個處理引擎進行處理的。

1.4 Batch和Streaming是如何使用同一個處理引擎。

下面將從代碼的角度去解釋Batch和Streaming是如何使用同一處理引擎的。首先從Flink測試用例來區分兩者的區別。

Batch WordCount Examples


十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

Streaming WordCount Examples

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

Batch和Streaming採用的不同的ExecutionEnviroment,對於ExecutionEnviroment來說讀到的源數據是一個DataSet,而 StreamExecutionEnviroment的源數據來說則是一個DataStream。


十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?


十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?


十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

接著我們追蹤下Batch的從Optimzer到JobGgraph的流程,這裡如果是Local模式構造的是LocalPlanExecutor,這裡我們只介紹 Remote模式,此處的executor為RemotePlanExecutor


十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

最終會調用ClusterClient的run方法將我們的應用提交上去,run方法的第一步就是獲取jobGraph,這個是client端的操作,client 會將jobGraph提交給JobManager轉化為ExecutionGraph。Batch和streaming不同之處就是在獲取JobGraph上面。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

如果我們初始化的FlinkPlan是StreamingPlan,則首先構造Streaming的StreamingJobGraphGenerator去將optPlan轉為 JobGraph,Batch則直接採用另一種的轉化方式。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?


十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

簡而言之,Batch和streaming會有兩個不同的ExecutionEnvironment,不同的ExecutionEnvironment會將不同的API翻譯成不同 的JobGgrah,JobGraph 之上除了 StreamGraph 還有 OptimizedPlan。OptimizedPlan 是由 Batch API 轉換而來的。 StreamGraph 是由 Stream API 轉換而來的,JobGraph 的責任就是統一 Batch 和 Stream 的圖。

1.5 特性分析

高吞吐 & 低延遲

Flink 的流處理引擎只需要很少配置就能實現高吞吐率和低延遲。下圖展示了一個分佈式計數的任務的性能,包括了流數據 shuffle 過程。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

支持 Event Time 和亂序事件

Flink 支持了流處理和 Event Time 語義的窗口機制。

Event time 使得計算亂序到達的事件或可能延遲到達的事件更加簡單。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

狀態計算的 exactly-once 語義

流程序可以在計算過程中維護自定義狀態。

Flink 的 checkpointing 機制保證了即時在故障發生下也能保障狀態的 exactly once 語義。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

高度靈活的流式窗口

Flink 支持在時間窗口,統計窗口,session 窗口,以及數據驅動的窗口

窗口可以通過靈活的觸發條件來定製,以支持複雜的流計算模式。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

帶反壓的連續流模型

數據流應用執行的是不間斷的(常駐)operators。

Flink streaming 在運行時有著天然的流控:慢的數據 sink 節點會反壓(backpressure)快的數據源(sources)。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

容錯性

Flink 的容錯機制是基於 Chandy-Lamport distributed snapshots 來實現的。

這種機制是非常輕量級的,允許系統擁有高吞吐率的同時還能提供強一致性的保障。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

Batch 和 Streaming 一個系統流處理和批處理共用一個引擎


Flink 為流處理和批處理應用公用一個通用的引擎。批處理應用可以以一種特殊的流處理應用高效地運行。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

內存管理

Flink 在 JVM 中實現了自己的內存管理。

應用可以超出主內存的大小限制,並且承受更少的垃圾收集的開銷。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

迭代和增量迭代

Flink 具有迭代計算的專門支持(比如在機器學習和圖計算中)。

增量迭代可以利用依賴計算來更快地收斂。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

程序調優


批處理程序會自動地優化一些場景,比如避免一些昂貴的操作(如 shuffles 和 sorts),還有緩存一些中間數據。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

API 和 類庫


流處理應用

DataStream API 支持了數據流上的函數式轉換,可以使用自定義的狀態和靈活的窗口。

右側的示例展示瞭如何以滑動窗口的方式統計文本數據流中單詞出現的次數。

val texts:DataStream[String] = ...

val counts = text .flatMap { line => line.split("\\\\W+") } .map { token => Word(token, 1) } .keyBy("word") .timeWindow(Time.seconds(5), Time.seconds(1)) .sum("freq")

批處理應用

Flink 的 DataSet API 可以使你用 Java 或 Scala 寫出漂亮的、類型安全的、可維護的代碼。它支持廣泛的數據類型,不僅僅是 key/value 對,以及豐富的 operators。

右側的示例展示了圖計算中 PageRank 算法的一個核心循環。

case class Page( pageId: Long, rank:Double) case class Adjacency( id: Long, neighbors:Array[Long])

val result = initialRanks.iterate(30) { pages => pages.join(adjacency).where("pageId").equalTo("pageId") { (page, adj, out : Collector[Page]) => { out.collect(Page(page.id, 0.15 / numPages)) for (n

類庫生態

Flink 棧中提供了提供了很多具有高級 API 和滿足不同場景的類庫:機器學習、圖分析、關係式數據處理。當前類庫還在 beta 狀 態,並且在大力發展。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

廣泛集成

Flink 與開源大數據處理生態系統中的許多項目都有集成。

Flink 可以運行在 YARN 上,與 HDFS 協同工作,從 Kafka 中讀取流數據,可以執行 Hadoop 程序代碼,可以連接多種數據存儲 系統。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

部署

Flink可以單獨脫離Hadoop進行部署,部署只依賴Java環境,相對簡單


網易有數

企業級大數據可視化分析平臺。面向業務人員的自助式敏捷分析平臺,採用PPT模式的報告製作,更加易學易用,具備強大的探索分析功能,真正幫助用戶洞察數據發現價值。

Flink 集群啟動後架構圖。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

當 Flink 集群啟動後,首先會啟動一個 JobManger 和一個或多個的 TaskManager。由 Client 提交任務給 JobManager,JobManager 再調度任務到各個 TaskManager 去執行,然後 TaskManager 將心跳和統計信息彙報給 JobManager。TaskManager 之間以流的形式進行數據的傳輸。上述三者均為獨立的 JVM 進程。

  • Client 為提交 Job 的客戶端,可以是運行在任何機器上(如 JobManager 環境連通即可)。提交 Job 後,Client 可以結束進程(Streaming的任務),也可以不結束並等待結果返回。
  • JobManager 主要負責調度 Job 並協調 Task 做 checkpoint,職責上很像 Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包等資源後,會生成優化後的執行計劃,並以 Task 的單元調度到各個 TaskManager 去執行。
  • TaskManager 在啟動的時候就設置好了槽位數(Slot),每個 slot 能啟動一個 Task,Task 為線程。從 JobManager 處接收需要部署的 Task,部署啟動後,與自己的上游建立 Netty 連接,接收數據並處理。Client 為提交 Job 的客戶端,可以是運行在任何機器上(與 JobManager 環境連通即可)。提交 Job 後,Client 可以結束進程(Streaming的任務),也可以不結束並等待結果返回。

可以看到 Flink 的任務調度是多線程模型,並且不同Job/Task混合在一個 TaskManager 進程中。雖然這種方式可以有效提高 CPU 利用率,但是個人不太喜歡這種設計,因為不僅缺乏資源隔離機制,同時也不方便調試。類似 Storm 的進程模型,一個JVM 中只跑該 Job 的 Tasks 實際應用中更為合理。

Job 例子

本文所示例子為 flink-1.0.x 版本

我們使用 Flink 自帶的 examples 包中的 SocketTextStreamWordCount ,這是一個從 socket 六種統計單詞出現次數的例子。

  • 首先,使用 netcat 啟動本地服務器:
<code>$ nc -l 9000/<code>

然後提交 Flink 程序

$ bin/flink run examples/streaming/SocketTextStreamWordCount.jar \\

--hostname 10.218.130.9 \\

--port 9000

在netcat端輸入單詞並監控 taskmanager 的輸出可以看到單詞統計的結果。

SocketTextStreamWordCount 的具體代碼如下:

public static void main(String[] args) throws Exception {

// 檢查輸入

final ParameterTool params = ParameterTool.fromArgs(args);

...


// set up the execution environment

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


// get input data

DataStream<string> text =/<string>

env.socketTextStream(params.get("hostname"), params.getInt("port"), '\\n', 0);


DataStream<tuple2>> counts =/<tuple2>

// split up the lines in pairs (2-tuples) containing: (word,1)

text.flatMap(new Tokenizer())

// group by the tuple field "0" and sum up tuple field "1"

.keyBy(0)

.sum(1);

counts.print();


// execute program

env.execute("WordCount from SocketTextStream Example");

}

我們將最後一行代碼 env.execute 替換成 System.out.println(env.getExecutionPlan()); 並在本地運行該代碼(併發度設為2),可以得到該拓撲的邏輯執行計劃圖的 JSON 串,將該 JSON 串粘貼到 http://flink.apache.org/visualizer/ 中,能可視化該執行圖。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

但這並不是最終在 Flink 中運行的執行圖,只是一個表示拓撲節點關係的計劃圖,在 Flink 中對應了 SteramGraph。另外,提交拓撲後(併發度設為2)還能在 UI 中看到另一張執行計劃圖,如下所示,該圖對應了 Flink 中的 JobGraph。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

Graph

看起來有點亂,怎麼有這麼多不一樣的圖。實際上,還有更多的圖。

Flink 中的執行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖。

  • StreamGraph: 是根據用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結構。
  • JobGraph: StreamGraph經過優化後生成了 JobGraph,提交給 JobManager 的數據結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少數據在節點之間流動所需要的序列化/反序列化/傳輸消耗。
  • ExecutionGraph: JobManager 根據 JobGraph 生成的分佈式執行圖,是調度層最核心的數據結構。
  • 物理執行圖: JobManager 根據 ExecutionGraph 對 Job 進行調度後,在各個TaskManager 上部署 Task 後形成的“圖”,並不是一個具體的數據結構。

例如上文中的 2個併發度的 SocketTextStreamWordCount 四層執行圖的演變過程如下圖所示(點擊查看大圖):

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

這裡對一些名詞進行簡單的解釋。

  • StreamGraph: 根據用戶通過 Stream API 編寫的代碼生成的最初的圖。
  • StreamNode:用來代表 operator 的類,並具有所有相關的屬性,如併發度、入邊和出邊等。
  • StreamEdge:表示連接兩個StreamNode的邊。
  • JobGraph: StreamGraph經過優化後生成了 JobGraph,提交給 JobManager 的數據結構。
  • JobVertex:經過優化後符合條件的多個StreamNode可能會chain在一起生成一個JobVertex,即一個JobVertex包含一個或多個operator,JobVertex的輸入是JobEdge,輸出是IntermediateDataSet。
  • IntermediateDataSet:表示JobVertex的輸出,即經過operator處理產生的數據集。producer是JobVertex,consumer是JobEdge。
  • JobEdge:代表了job graph中的一條數據傳輸通道。source 是 IntermediateDataSet,target 是 JobVertex。即數據通過JobEdge由IntermediateDataSet傳遞給目標JobVertex。
  • ExecutionGraph: JobManager 根據 JobGraph 生成的分佈式執行圖,是調度層最核心的數據結構。
  • ExecutionJobVertex:和JobGraph中的JobVertex一一對應。每一個ExecutionJobVertex都有和併發度一樣多的 ExecutionVertex。
  • ExecutionVertex:表示ExecutionJobVertex的其中一個併發子任務,輸入是ExecutionEdge,輸出是IntermediateResultPartition。
  • IntermediateResult:和JobGraph中的IntermediateDataSet一一對應。每一個IntermediateResult有與下游ExecutionJobVertex相同併發數的IntermediateResultPartition。
  • IntermediateResultPartition:表示ExecutionVertex的一個輸出分區,producer是ExecutionVertex,consumer是若干個ExecutionEdge。
  • ExecutionEdge:表示ExecutionVertex的輸入,source是IntermediateResultPartition,target是ExecutionVertex。source和target都只能是一個。
  • Execution:是執行一個 ExecutionVertex 的一次嘗試。當發生故障或者數據需要重算的情況下 ExecutionVertex 可能會有多個 ExecutionAttemptID。一個 Execution 通過 ExecutionAttemptID 來唯一標識。JM和TM之間關於 task 的部署和 task status 的更新都是通過 ExecutionAttemptID 來確定消息接受者。
  • 物理執行圖: JobManager 根據 ExecutionGraph 對 Job 進行調度後,在各個TaskManager 上部署 Task 後形成的“圖”,並不是一個具體的數據結構。
  • Task:Execution被調度後在分配的 TaskManager 中啟動對應的 Task。Task 包裹了具有用戶執行邏輯的 operator。
  • ResultPartition:代表由一個Task的生成的數據,和ExecutionGraph中的IntermediateResultPartition一一對應。
  • ResultSubpartition:是ResultPartition的一個子分區。每個ResultPartition包含多個ResultSubpartition,其數目要由下游消費 Task 數和 DistributionPattern 來決定。
  • InputGate:代表Task的輸入封裝,和JobGraph中JobEdge一一對應。每個InputGate消費了一個或多個的ResultPartition。
  • InputChannel:每個InputGate會包含一個以上的InputChannel,和ExecutionGraph中的ExecutionEdge一一對應,也和ResultSubpartition一對一地相連,即一個InputChannel接收一個ResultSubpartition的輸出。

2.Spark Streaming架構及特性分析

2.1 基本架構

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

基於是spark core的spark streaming架構。

Spark Streaming是將流式計算分解成一系列短小的批處理作業。這裡的批處理引擎是Spark,也就是把Spark Streaming的輸入數 據按照batch size(如1秒)分成一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset ) , 然 後 將 Spark Streaming 中 對 DStream 的 Transformation 操 作 變 為 針 對 Spark 中 對 RDD 的 Transformation操作,將RDD經 過操作變成中間結果保存在內存中。整個流式計算根據業務的需求可以對中間的結果進行疊加, 或者存儲到外部設備。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

簡而言之,Spark Streaming把實時輸入數據流以時間片Δt (如1秒)為單位切分成塊,Spark Streaming會把每塊數據作為一個 RDD,並使用RDD操作處理每一小塊數據。每個塊都會生成一個Spark Job處理,然後分批次提交job到集群中去運行,運行每個 job的過程和真正的spark 任務沒有任何區別。

JobScheduler

負責job的調度

JobScheduler是SparkStreaming 所有Job調度的中心, JobScheduler的啟動會導致ReceiverTracker和JobGenerator的啟動。 ReceiverTracker的啟動導致運行在Executor端的Receiver啟動並且接收數據,ReceiverTracker會記錄Receiver接收到的數據 meta信息。JobGenerator的啟動導致每隔BatchDuration,就調用DStreamGraph生成RDD Graph,並生成Job。JobScheduler 中的線程池來提交封裝的JobSet對象(時間值,Job,數據源的meta)。Job中封裝了業務邏輯,導致最後一個RDD的action被觸 發,被DAGScheduler真正調度在Spark集群上執行該Job。

JobGenerator

負責Job的生成

通過定時器每隔一段時間根據Dstream的依賴關係生一個一個DAG圖。

ReceiverTracker

負責數據的接收,管理和分配

ReceiverTracker在啟動Receiver的時候他有ReceiverSupervisor,其實現是ReceiverSupervisorImpl, ReceiverSupervisor本身啟 動的時候會啟動Receiver,Receiver不斷的接收數據,通過BlockGenerator將數據轉換成Block。定時器會不斷的把Block數據通會不斷的把Block數據通過BlockManager或者WAL進行存儲,數據存儲之後ReceiverSupervisorlmpl會把存儲後的數據的元數據Metadate彙報給ReceiverTracker,其實是彙報給ReceiverTracker中的RPC實體ReceiverTrackerEndpoint。

2.2 基於Yarn層面的架構分析

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

上圖為spark on yarn 的cluster模式,Spark on Yarn啟動後,由Spark AppMaster中的driver(在AM的裡面會啟動driver,主要 是StreamingContext對象)把Receiver作為一個Task提交給某一個Spark Executor;Receive啟動後輸入數據,生成數據塊,讓 後通知Spark AppMaster;Spark AppMaster會根據數據塊生成相應的Job,並把Job的Task提交給空閒Spark Executor 執行。圖 中藍色的粗箭頭顯示被處理的數據流,輸入數據流可以是磁盤、網絡和HDFS等,輸出可以是HDFS,數據庫等。對比Flink和spark streaming的cluster模式可以發現,都是AM裡面的組件(Flink是JM,spark streaming是Driver)承載了task的分配和調度,其他 container承載了任務的執行(Flink是TM,spark streaming是Executor),不同的是spark streaming每個批次都要與driver進行 通信來進行重新調度,這樣延遲性遠低於Flink。

具體實現

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

Streaming程序轉換為DStream Graph

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

Graph轉換為RDD的Graph

Spark Core處理的每一步都是基於RDD的,RDD之間有依賴關係。下圖中的RDD的DAG顯示的是有3個Action,會觸發3個job, RDD自下向上依 賴,RDD產生job就會具體的執行。從DSteam Graph中可以看到,DStream的邏輯與RDD基本一致,它就是在 RDD的基礎上加上了時間的依賴。RDD的DAG又可以叫空間維度,也就是說整個 Spark Streaming多了一個時間維度,也可以成 為時空維度,使用Spark Streaming編寫的程序與編寫Spark程序非常相似,在Spark程序中,主要通過操作RDD(Resilient Distributed Datasets彈性分佈式數據集)提供的接口,如map、reduce、filter等,實現數據的批處理。而在Spark Streaming 中,則通過操作DStream(表示數據流的RDD序列)提供的接口,這些接口和RDD提供的接口類似。

Spark Streaming把程序中對 DStream的操作轉換為DStream Graph,圖2.1中,對於每個時間片,DStream Graph都會產生一個RDD Graph;針對每個輸出 操作(如print、foreach等),Spark Streaming都會創建一個Spark action;對於每個Spark action,Spark Streaming都會產生 一個相應的Spark job,並交給JobScheduler。JobScheduler中維護著一個Jobs隊列, Spark job存儲在這個隊列中, JobScheduler把Spark job提交給Spark Scheduler,Spark Scheduler負責調度Task到相應的Spark Executor上執行,最後形成 spark的job。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

圖2.3時間維度生成RDD的DAG

Y軸就是對RDD的操作,RDD的依賴關係構成了整個job的邏輯,而X軸就是時間。隨著時間的流逝,固定的時間間隔(Batch Interval)就會生成一個job實例,進而在集群中運行。

代碼實現

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

基於spark 1.5的spark streaming源代碼解讀,基本架構是沒怎麼變化的。

2.3 組件棧

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

支持從多種數據源獲取數據,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,從數據源獲取數據之後,可以 使用諸如map、reduce、join和window等高級函數進行復雜算法的處理。最後還可以將處理結果 存儲到文件系統,數據庫和現場 儀表盤。在“One Stack rule them all”的基礎上,還可以使用Spark的其他子框架,如集群學習、圖計算等,對流數據進行處 理。

2.4 特性分析

吞吐量與延遲性

Spark目前在EC2上已能夠線性擴展到100個節點(每個節點4Core),可以以數秒的延遲處理6GB/s的數據量(60M records/s),其吞吐量也比流行的Storm高2~5倍,圖4是Berkeley利用WordCount和Grep兩個用例所做的測試,在 Grep這個 測試中,Spark Streaming中的每個節點的吞吐量是670k records/s,而Storm是115k records/s。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

Spark Streaming將流式計算分解成多個Spark Job,對於每一段數據的處理都會經過Spark DAG圖分解,以及Spark的任務集的調 度過程,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),所以Spark Streaming能夠滿足 除對實時性要求非常高(如高頻實時交易)之外的所有流式準實時計算場景。

exactly-once 語義

更加穩定的exactly-once語義支持。

反壓能力的支持

Spark Streaming 從v1.5開始引入反壓機制(back-pressure),通過動態控制數據接收速率來適配集群數據處理能力.

Sparkstreaming如何反壓?

簡單來說,反壓機制需要調節系統接受數據速率或處理數據速率,然而系統處理數據的速率是沒法簡單的調節。因此,只能估計當 前系統處理數據的速率,調節系統接受數據的速率來與之相匹配。

Flink如何反壓?

嚴格來說,Flink無需進行反壓,因為系統接收數據的速率和處理數據的速率是自然匹配的。系統接收數據的前提是接收數據的Task 必須有空閒可用的Buffer,該數據被繼續處理的前提是下游Task也有空閒可用的Buffer。因此,不存在系統接受了過多的數據,導 致超過了系統處理的能力。

由此看出,Spark的micro-batch模型導致了它需要單獨引入反壓機制。

反壓與高負載

反壓通常產生於這樣的場景:短時負載高峰導致系統接收數據的速率遠高於它處理數據的速率。

但是,系統能夠承受多高的負載是系統數據處理能力決定的,反壓機制並不是提高系統處理數據的能力,而是系統所面臨負載高於 承受能力時如何調節系統接收數據的速率。

容錯

Driver和executor採用預寫日誌(WAL)方式去保存狀態,同時結合RDD本身的血統的容錯機制。

API 和 類庫

Spark 2.0中引入了結構化數據流,統一了SQL和Streaming的API,採用DataFrame作為統一入口,能夠像編寫普通Batch程序或 者直接像操作SQL一樣操作Streaming,易於編程。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

廣泛集成

除了可以讀取HDFS, Flume, Kafka, Twitter andZeroMQ數據源以外,我們自己也可以定義數據源,支持運行在Yarn, Standalone及EC2上,能夠通過Zookeeper,HDFS保證高可用性,處理結果可以直接寫到HDFS

部署性

依賴java環境,只要應用能夠加載到spark相關的jar包即可。

3.Storm架構及特性分析

3.1 基本架構

Storm集群採用主從架構方式,主節點是Nimbus,從節點是Supervisor,有關調度相關的信息存儲到ZooKeeper集群中。架構如下:

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

Nimbus

Storm集群的Master節點,負責分發用戶代碼,指派給具體的Supervisor節點上的Worker節點,去運行Topology對應的組件 (Spout/Bolt)的Task。

Supervisor

Storm集群的從節點,負責管理運行在Supervisor節點上的每一個Worker進程的啟動和終止。通過Storm的配置文件中的 supervisor.slots.ports配置項,可以指定在一個Supervisor上最大允許多少個Slot,每個Slot通過端口號來唯一標識,一個端口號 對應一個Worker進程(如果該Worker進程被啟動)。

ZooKeeper

用來協調Nimbus和Supervisor,如果Supervisor因故障出現問題而無法運行Topology,Nimbus會第一時間感知到,並重新分配 Topology到其它可用的Supervisor上運行。

運行架構

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

運行流程

1)戶端提交拓撲到nimbus。

2) Nimbus針對該拓撲建立本地的目錄根據topology的配置計算task,分配task,在zookeeper上建立assignments節點存儲 task和supervisor機器節點中woker的對應關係;

在zookeeper上創建taskbeats節點來監控task的心跳;啟動topology。

3) Supervisor去zookeeper上獲取分配的tasks,啟動多個woker進行,每個woker生成task,一個task一個線程;根據topology 信息初始化建立task之間的連接;Task和Task之間是通過zeroMQ管理的;後整個拓撲運行起來。

3.2 基於Yarn層面的架構

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

在YARN上開發一個應用程序,通常只需要開發兩個組件,分別是客戶端和ApplicationMaster,其中客戶端主要作用是提交應用程 序到YARN上,並和YARN和ApplicationMaster進行交互,完成用戶發送的一些指令;而ApplicationMaster則負責向YARN申請 資源,並與NodeManager通信,啟動任務。

不修改任何Storm源代碼即可將其運行在YARN之上,最簡單的實現方法是將Storm的各個服務組件(包括Nimbus和Supervisor) 作為單獨的任務運行在YARN上,而Zookeeper作為一個公共的服務運行在YARN集群之外的幾個節點上。

1)通過YARN-Storm Client將Storm Application提交到YARN的RM上;

2)RM為YARN-Storm ApplicationMaster申請資源,並將其運行在一個節點上(Nimbus);

3)YARN-Storm ApplicationMaster 在自己內部啟動Nimbus和UI服務;

4)YARN-Storm ApplicationMaster 根據用戶配置向RM申請資源,並在申請到的Container中啟動Supervisor服務;

3.3 組件棧

3.4 特性分析

簡單的編程模型。

類似於MapReduce降低了並行批處理複雜性,Storm降低了進行實時處理的複雜性。

服務化

一個服務框架,支持熱部署,即時上線或下線App.

可以使用各種編程語言

你可以在Storm之上使用各種編程語言。默認支持Clojure、Java、Ruby和Python。要增加對其他語言的支持,只需實現一個簡單 的Storm通信協議即可。

容錯性

Storm會管理工作進程和節點的故障。

水平擴展

計算是在多個線程、進程和服務器之間並行進行的。

可靠的消息處理

Storm保證每個消息至少能得到一次完整處理。任務失敗時,它會負責從消息源重試消息。

快速

系統的設計保證了消息能得到快速的處理,使用ZeroMQ作為其底層消息隊列。

本地模式

Storm有一個“本地模式”,可以在處理過程中完全模擬Storm集群。這讓你可以快速進行開發和單元測試。

部署性

依賴於Zookeeper進行任務狀態的維護,必須首先部署Zookeeper。

4.三種框架的對比分析

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

對比分析

如果對延遲要求不高的情況下,建議使用Spark Streaming,豐富的高級API,使用簡單,天然對接Spark生態棧中的其他組 件,吞吐量大,部署簡單,UI界面也做的更加智能,社區活躍度較高,有問題響應速度也是比較快的,比較適合做流式的ETL,而 且Spark的發展勢頭也是有目共睹的,相信未來性能和功能將會更加完善。

如果對延遲性要求比較高的話,建議可以嘗試下Flink,Flink是目前發展比較火的一個流系統,採用原生的流處理系統,保證了低延遲性,在API和容錯性上也是做的比較完善,使用起來相對來說也是比較簡單的,部署容易,而且發展勢頭也越來越好,相信後面社區問題的響應速度應該也是比較快的。

個人對Flink是比較看好的,因為原生的流處理理念,在保證了低延遲的前提下,性能還是比較好的,且越來越易用,社區也在不斷 發展。

在流式計算領域,同一套系統需要同時兼具容錯和高性能其實非常難,同時它也是衡量和選擇一個系統的標準。在這個領域,Flink和Spark無疑是彼此非常強勁的對手。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

2. Flink VS Spark 之 Connectors

Spark 支持的Connectors如下所示:

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

Flink支持的Connectors如下所示:

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

從Flink和Spark Connectors對比可以看出,Spark與Flink支持的Connectors的數量差不多,目前來說可能Spark支持更多一些,Flink後續的支持也會逐步的完善。

3. Flink VS Spark 之 運行環境

Spark 與Flink所支持的運行環境基本差不多,都比較廣泛。

十年大佬帶你Flink,SparkStreaming,Storm經典對戰,誰將勝出?

本文較長,感謝觀看。


分享到:


相關文章: