前言
hello,大家好!昨天發了篇官方的 文章。
考慮到有的朋友對Flink瞭解不多,因此今天再補發一篇Flink介紹文章。
還是那句話,希望朋友們在學習開源框架的時候,能更多地從源頭學起。
希望在Java開源框架學習的道路上,能與您一起風雨前行,一起成長,一起快樂!
正文
Apache Flink是一個可對無界和有界數據流進行有狀態計算的分佈式處理框架.
Flink能在所有常見集群環境中運行,並能以內存速度和任意規模進行計算.
處理無界和有界數據
所有類型的數據都可作為事件流來產生. 信用卡交易,傳感器測量,機器日誌,或網站/移動應用程序上的用戶交互,所有這些數據都是以流形式產生的。
數據可作為無界或有界流處理:
無界流(Unbounded streams)
有開始無結束. 無界流是持續的,不會終止,因此必須對其連續處理(即必須在攝取之後立即處理事件)。無法等待所有輸入數據到達,因為輸入是無界的,並且在任何時間點都不會完成傳輸。 處理無界數據通常要求必須以特定順序攝取事件,例如事件發生的順序,以便能夠推斷結果完整性。
有界流(Bounded streams)
有開始有結束. 可以在執行計算之前通過攝取所有數據來處理有界流。 處理有界流不需要有序攝取,因為始終都可對有界數據集進行排序。 有界流的處理也稱為批處理。
Apache Flink擅長處理無界和有界數據集。
對時間和狀態的精確控制,能讓Flink在無界流上運行任何類型的應用程序。
有界流通過算法和數據結構進行內部處理,這些算法和數據結構專門針對固定大小的數據集而設計,因而性能更為出色。
Deploy Applications Anywhere
Apache Flink是一個分佈式系統,因此它需要相應的資源才能執行應用程序。
Flink不僅能與常見的集群資源管理器(如Hadoop YARN,Apache Mesos和Kubernetes)集成,而且還能作為獨立的集群運行。
Flink能很好地與前面列出的每個資源管理器一起工作。 這是通過特定資源管理器的部署模式來實現的,這些模式允許Flink以一貫方式與每個資源管理器進行交互。
部署Flink應用程序時,Flink會根據應用程序配置的並行度(parallelism)自動識別所需資源,並從資源管理器調配這些資源。 如果發生故障,Flink會重新調配新資源來替換髮生故障的容器。
提交或控制應用程序的所有操作都可通過REST API來執行。
Run Applications at any Scale
Flink能在任意規模的集群中運行有狀態流應用程序.
藉助於Flink,應用程序能輕鬆地並行化為數千個同時運行的任務. 因此,應用程序可以利用幾乎無限量的CPU,主內存,磁盤和網絡IO.
而且,Flink還可輕鬆地保存非常大的應用程序狀態。在保證exactly-once 狀態一致的前提下,Flink的異步和增量檢查點算法還能最大化地降低對延遲的影響.
下面是在用戶生產環境中運行Flink應用程序時報告的可擴展性數字,例如
- 應用程序每天處理數萬億個事件
- 應用程序維護多個TB級數據狀態
- 應用程序可運行在數千核之上
Leverage In-Memory Performance
有狀態Flink應用程序對本地狀態訪問進行了優化。 任務狀態始終保留在內存中,如果狀態超過可用內存大小,則會將狀態保存在能高效訪問的磁盤數據結構中。
因此,任務通過訪問本地(通常是內存)狀態來執行所有計算,從而產生非常低的處理延遲。
Flink通過定期異步地將本地狀態檢查點到持久存儲來保證在出現故障時的exactly-once狀態一致性。
Apache Flink是一個可對無界和有界數據流進行有狀態計算的框架。
Flink在不同抽象層次提供多個API,併為常見用例提供專用庫。
在這裡,我們將介紹Flink易於使用和富有表現力的API和庫。
Building Blocks for Streaming Applications
可以由流處理框架構建和執行的應用程序類型是通過框架的controls streams,state和time來定義的。
下面我們將介紹流處理應用程序中的這些構建塊,並說明Flink對它們的處理方法。
Streams
顯然,streams是流處理的基礎.但,流可以具有不同特徵,這些特徵會影響流的處理方式.
Flink是一個多功能的處理框架,可以處理任何形式的流。
有界(Bounded)和無界(unbounded)流
流可以是無界的,也可以是有界的(即:固定大小的數據集). Flink具有處理無界流的複雜功能,但也提供專門的算子來有效地處理有界流。
實時(Real-time)和記錄(recorded)流
所有數據都可當成流. 有兩種處理數據的方式. 一種是實時處理,另一種是先將流存儲到存儲系統(例如,文件系統或對象存儲),然後後期再處理. Flink 應用程序能處理這兩種記錄或實時流.
State
每個特殊的流應用程序都是有狀態的,也就是說,只有在單個事件上應用轉換的應用程序才不需要狀態. 運行基本業務邏輯的任何應用程序都需要記住事件或中間結果,以便能在以後的某個時間點(例如,當接收到下一個事件或在特定的時間段之後)訪問它們。
通過查看Flink在狀態處理上下文中提供的所有特性,即可證實這一點。
Multiple State Primitives
Flink為不同的數據結構提供狀態原語,例如原子值(atomic values),列表(lists)或映射(maps)。 開發人員可以根據函數的訪問模式選擇最有效的狀態原語。
Pluggable State Backends
應用程序狀態可通過可插拔的狀態後端來管理和檢查。Flink具有不同的狀態後端,可以在內存或RocksDB中存儲狀態,RocksDB是一種高效的嵌入式磁盤數據存儲。 也可以插入自定義狀態後端。
Exactly-once state consistency
Flink的檢查點和恢復算法可確保在發生故障時應用程序狀態的一致性。 因此,故障是透明處理的,不會影響應用程序的正確性。
Very Large State
由於其異步和增量檢查點算法,Flink能夠維持TB字節的應用程序狀態。
Scalable Applications
Flink通過將狀態重新分配給更多或更少的workers來支持有狀態應用程序的擴展。
Time
時間是流應用程序的另一個重要組成部分。 大多數事件流都具有固有的時間語義,因為每個事件都是在特定時間點發生的。 此外,許多常見的流計算都是基於時間的,例如窗口聚合,會話化,模式檢測和基於時間的連接。 流處理的一個重要方面是應用程序如何測量時間,即事件時間和處理時間的差異。Flink提供了一組豐富的與時間相關的功能。
Event-time Mode
處理具有事件時間語義的流應用程序會基於事件的時間戳來計算結果。 因此,無論是否處理記錄還是實時事件,事件時間處理都能保證準確一致結果。
Watermark Support
Flink使用水印來推斷事件時間應用中的時間。 水印也是一種靈活的機制,可以權衡結果的延遲和完整性。
Late Data Handling
當以帶有水印的事件時間模式(event-time mode)處理流時,可能會在所有關聯事件到達之前完成計算。這類事件稱為延遲事件。Flink具有多個選項來處理延遲事件,例如通過重新路由輸出方數據來更新之前完成的計算結果。
Processing-time Mode:
除了event-time mode外, Flink還支持processing-time 語義,該處理時間語義執行由處理機器的掛鐘時間(wall-clock time )來觸發計算。processing-time mode適用於具有嚴格低延遲要求的某些應用,這些要求可以容忍近似結果。
Layered APIs
Flink提供三層API。 每個API在簡潔性和表達性之間提供不同的權衡,並針對不同的用例。
ProcessFunctions
ProcessFunctions 是Flink中最具表現力的函數接口.
Flink提供ProcessFunction來處理來自窗口分組中一個或兩個輸入流/事件的單個事件。 ProcessFunctions提供對時間和狀態的細粒度控制。
ProcessFunction可以任意修改其狀態,並可註冊一個可在後期觸發回調函數的定時器。
因此,ProcessFunctions可在許多有狀態事件驅動應用程序中實現複雜的事件業務邏輯。
下面展示了一個KeyedProcessFunction,它針對KeyedStream進行操作並匹配START和END事件。 收到START事件時,該函數會記住其狀態的時間戳,並註冊一個四小時之後觸發的計時器。
如果在計時器觸發之前收到END事件,則該函數會計算END和START事件之間的持續時間,清除狀態並返回該值。 否則,計時器只會清除狀態。
/**
* 匹配START和END事件,並計算兩個元素時間戳之間的差異。
* 第一個String字段是key屬性,第二個String屬性用於標記START和END事件。
*/
public static class StartEndDuration
extends KeyedProcessFunction<string>, Tuple2<string>> {
private ValueState<long> startTime;
@Override
public void open(Configuration conf) {
// 獲得state句柄
startTime = getRuntimeContext()
.getState(new ValueStateDescriptor<long>("startTime", Long.class));
}
/** 每次處理事件時都會調用 **/
@Override
public void processElement(
Tuple2<string> in,
Context ctx,
Collector<tuple2>> out) throws Exception {
switch (in.f1) {
case "START":
// 收到start事件則設置開始時間.
startTime.update(ctx.timestamp());
// 註冊timer
ctx.timerService()
.registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000);
break;
case "END":
// 發射事件持續時間
Long sTime = startTime.value();
if (sTime != null) {
out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime));
// 清除狀態
startTime.clear();
}
default:
// do nothing
}
}
/** 觸發timer時調用**/
@Override
public void onTimer(long timestamp,OnTimerContext ctx,Collector<tuple2>> out) {
// 事件超時則清除狀態
startTime.clear();
}
}
/<tuple2>/<tuple2>/<string>/<long>/<long>/<string>/<string>
該示例展示了KeyedProcessFunction的相關功能,但同時也說明它是一個相當冗長的接口。
DataStream API
DataStream API提供了常用流處理操作原語,例如:窗口,一次記錄轉換,以及通過查詢外部存儲來豐富事件等操作. DataStream API適用於Java和Scala,並且它是基於函數的,例如map(), reduce(), and aggregate(). 可以通過擴展接口或Java/Scala lambda函數來定義函數。
下面的示例會統計每個會話的點擊次數.
// 網站點擊流
DataStream<click> clicks = ...
DataStream<tuple2>> result = clicks
// project clicks to userId and add a 1 for counting
.map(
//實現MapFunction接口來自定義函數.
new MapFunction<click>>() {
@Override
public Tuple2<string> map(Click click) {
return Tuple2.of(click.userId, 1L);
}
})
// 以userId (field 0)作為key
.keyBy(0)
// 以30分鐘間隙作為會話窗口
.window(EventTimeSessionWindows.withGap(Time.minutes(30L)))
// 計算每個會話的點擊數,以lambda來定義函數。
.reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));
/<string>/<click>/<tuple2>/<click>
SQL & Table API
Flink提供兩個關係API-Table API和SQL. 這兩個API是批處理和流處理的統一API,即,在無界的實時流或有界的記錄流上都可使用相同的語義執行查詢,併產生一致結果。Table API 和 SQL 藉助Apache Calcite來執行解析,驗證,以及查詢優化. 它們能與DataStream/DataSet API無縫集成,並支持用戶自定義的標量,聚合和表值函數。
Flink關係API旨在簡化數據分析,數據流水線和ETL應用程序的定義。
下面的示例展示的是會話點擊流SQL查詢,並計算每個會話的點擊次數。 這與DataStream API示例中的作用相同。
SELECT userId, COUNT(*)
FROM clicks
GROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId
Libraries
Flink提供了可處理常見數據處理操作的庫.這些庫嵌在了API中(沒有完全獨立出來).因此,他們可以從API的所有功能中受益,並與其他庫集成。
Complex Event Processing (CEP)
模式檢測是事件流處理中一種非常常見的用例。 Flink的CEP庫提供了一個API來指定事件模式(可聯想一下正則表達式或狀態機)。 CEP庫與Flink的DataStream API集成,以便在DataStream上評估模式。 CEP庫的應用包括網絡入侵檢測,業務流程監控和欺詐檢測。
DataSet API
DataSet API是Flink中執行批處理的核心API。 DataSet API的原語包括map,reduce,(outer) join,co-group和iterate。 所有操作都由算法和數據結構組成,這些算法和數據結構會對內存中的序列化數據進行操作,並在數據大小超過內存預算時溢出到磁盤。 Flink的DataSet API的數據處理算法受到傳統數據庫運算符的啟發,例如 hybrid hash-join或 external merge-sort。
Gelly
Gelly是一個可擴展的圖形處理和分析庫。 Gelly基於DataSet API實現,並能與其集成。 因此,它能從可擴展且強大的片子中受益。Gelly有自己的內置算法 ,例如標籤傳播,三角形枚舉和頁面排序。但為了實現定製化,它也提供了一種Graph API ,藉助該API我們可以簡化自定義圖算法的實現。
Apache Flink是一個用於對無界和有界數據流進行有狀態計算的框架。由於許多流應用程序都會連續運行(停機時間非常少),因此流處理器必須提供出色的故障恢復工具,以及應用程序運行時監視和維護工具。
前面,我們重點介紹了流處理操作。接下來,我們將介紹Flink的故障恢復機制,以及應用程序管理和監控等功能。
全天候運行應用程序
在分佈式系統中,機器和進程故障隨時都可能發生。 像Flink這樣的分佈式流處理器必須要從故障中恢復,以便能夠24/7全天候運行流應用程序. 顯然,這不僅意味著在排除故障後能重啟應用程序,而且還要確保其內部狀態保持一致,以便應用程序可以繼續處理,就像從未發生過故障一樣。
Flink提供了多種手段來確保應用程序保持運行時一致:
Consistent Checkpoints
Flink的恢復機制基於應用程序狀態的一致檢查點.如果發生了故障,應用程序重啟時會根據最後一個檢查點來加載狀態.結合可重置流,該特性能保證exactly-once state consistency.
Efficient Checkpoints
如果應用程序保存了TB級別的狀態數據,那麼在這種情況下檢查應用程序的狀態將變得異常昂貴. Flink可執行異步和增量檢查點,這樣檢查點對應用程序的延遲影響降低到最小水平。
End-to-End Exactly-Once
Flink提供特定存儲系統的事務接收器(sinks),可確保數據只寫一次,即便出現故障也是如此。
Integration with Cluster Managers
Flink能與各種集群資源管理器集成,例如:Hadoop YARN, Mesos,或Kubernetes. 當某個進程失敗時,它會自動啟動一個新進程來接管其工作。
High-Availability Setup
Flink具有高可用性特性,它能消除所有單點故障。 HA模式基於Apache ZooKeeper,這是一種經過驗證的可靠分佈式協調服務。
更新,遷移,暫停和恢復應用程序
對於那些涉及核心業務的流應用程序,我們必須對其細心維護.但更新有狀態流應用程序並非易事.
通常,我們不能像普通應用程序一樣簡單地停止應用程序並重啟到固定版本或改進版本來更新流應用程序,因為我們無法承受丟失應用程序狀態的損失。
Flink的savepoint是一獨特而強大的功能,它可以解決更新有狀態應用程序時所面臨的一系列問題。 savepoint是應用程序狀態的一致快照,因此它與檢查點(checkpoints)非常相似.
但是,與檢查點不同的是savepoint需要手動觸發,並在應用程序停止時需手動刪除(不會自動刪除)。
savepoint可用於啟動與狀態相兼容的應用程序,並初始化其狀態。 savepoint啟用了以下功能:
Application Evolution
savepoint可用來進化應用程序. 應用程序的固定版本或改進版本可通過應用程序之前版本保存的savepoint來重新啟動. 也可以從較早的時間點(假設存在這樣的savepoint)啟動應用程序,以便修復缺陷版本所產生的錯誤結果。
Cluster Migration
使用savepoint,可以將應用程序遷移(或克隆)到不同的集群。
Flink Version Updates
可使用savepoint來將應用程序遷移到新的Flink版本上運行.
Application Scaling
savepoint可用來加大或降低應用程序的併發度。
A/B Tests and What-If Scenarios
可通過同一savepoint來啟動所有版本的應用程序,以便比較不同版本間的性能或質量差異。
Pause and Resume
可通過savepoint來暫停或停止應用程序. 然後,在之後的任意時間點,都可通過該savepoint來恢復應用程序.
Archiving
可以存檔savepoint,以便能夠將應用程序的狀態重置為較早的時間點。
監控和控制應用程序
與任何其他服務一樣,需要對連續運行的流應用程序進行監控,並將這些信息集成到日誌服務中。
監控有助於預測問題並提前做出反應。 日誌則有助於排查問題的根本原因。 最後,控制應用程序運行的管理界面也是一個重要功能。Flink能很好地與許多常見的日誌和監控服務集成,並且它還提供了REST API來控制應用程序,以及查詢相關信息。
Web UI
Flink提供了一個web UI來檢查,監視和調試正在運行的應用程序。此外,還可通過該web UI來提交或取消作業.
Logging
Flink實現了當下最為流行的slf4j logging接口,並能與log4j或logback這樣的日誌框架無縫集成.
Metrics
Flink提供了非常複雜的指標系統,這些指標系統會收集系統層面和用戶層面定義的指標數據. 指標可通過多種方式獲取.
例如:JMX, Ganglia, Graphite, Prometheus, StatsD, Datadog, 和Slf4j.
REST API
Flink暴露了一個REST API來執行某些操作,比如:提交新應用程序,獲取正在運行應用程序的savepoing,或取消正在運行的應用程序. 此外,該API還暴露了與應用程序相關的元數據和運行/完成指標。
閱讀更多 Java源 的文章