ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd)
}
第一行就是計算得到該批次生成 KafkaRDD 每個分區要消費的最大 offset。 接著看 latestLeaderOffsets(maxRetries)。
@tailrec protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {// 可以看到的是用來指定獲取最大偏移分區的列表還是隻有currentOffsets,沒有發現關於新增的分區的內容。
val o = kc.getLatestLeaderOffsets(currentOffsets.keySet) // Either.fold would confuse @tailrec, do it manually
if (o.isLeft) { val err = o.left.get.toString if (retries <= 0) { throw new SparkException(err)
} else {
logError(err) Thread.sleep(kc.config.refreshLeaderBackoffMs)
latestLeaderOffsets(retries - 1)
}
} else {
o.right.get
}
}
其中 protected var currentOffsets = fromOffsets,這個僅僅是在構建 DirectKafkaInputDStream 的時候初始化,並在 compute 裡面更新:
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
中間沒有檢測 Kafka 新增 topic 或者分區的代碼,所以可以確認 Spark Streaming 與 kafka 0.8 的版本結合不支持動態分區檢測。
Spark Streaming 與 Kafka 0.10 版本結合
入口同樣是 DirectKafkaInputDStream 的 compute 方法,撿主要的部分說,Compute 裡第一行也是計算當前 job 生成 kafkardd 要消費的每個分區的最大 offset:
// 獲取當前生成job,要用到的KafkaRDD每個分區最大消費偏移值
val untilOffsets = clamp(latestOffsets())
具體檢測 Kafka 新增 topic 或者分區的代碼在 latestOffsets()
/**
* Returns the latest (highest) available offsets, taking new partitions into account. */
protected def latestOffsets(): Map[TopicPartition, Long] = { val c = consumer
paranoidPoll(c) // 獲取所有的分區信息
val parts = c.assignment().asScala // make sure new partitions are reflected in currentOffsets
// 做差獲取新增的分區信息
val newPartitions = parts.diff(currentOffsets.keySet) // position for new partitions determined by auto.offset.reset if no commit
// 新分區消費位置,沒有記錄的化是由auto.offset.reset決定
currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap // don't want to consume messages, so pause
c.pause(newPartitions.asJava) // find latest available offsets
c.seekToEnd(currentOffsets.keySet.asJava)
parts.map(tp => tp -> c.position(tp)).toMap
}
該方法內有獲取 Kafka 新增分區,並將其更新到 currentOffsets 的過程,所以可以驗證 Spark Streaming 與 Kafka 0.10 版本結合支持動態分區檢測。
Flink
入口類是 FlinkKafkaConsumerBase,該類是所有 Flink 的 Kafka 消費者的父類。
在 FlinkKafkaConsumerBase 的 run 方法中,創建了 kafkaFetcher,實際上就是消費者:
this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
useMetrics);
接是創建了一個線程,該線程會定期檢測 Kafka 新增分區,然後將其添加到 kafkaFetcher 裡。
if (discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) { final AtomicReference
public void run() { try { // --------------------- partition discovery loop ---------------------
List
// performing the next operation, so that we can escape the loop as soon as possible
while (running) { if (LOG.isDebugEnabled()) { LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());
} try {
discoveredPartitions = partitionDiscoverer.discoverPartitions();
} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) { // the partition discoverer may have been closed or woken up before or during the discovery;
// this would only happen if the consumer was canceled; simply escape the loop
break;
} // no need to add the discovered partitions if we were closed during the meantime
if (running && !discoveredPartitions.isEmpty()) {
kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
} // do not waste any time sleeping if we're not running anymore
if (running && discoveryIntervalMillis != 0) { try { Thread.sleep(discoveryIntervalMillis);
} catch (InterruptedException iex) { // may be interrupted if the consumer was canceled midway; simply escape the loop
break;
}
}
}
} catch (Exception e) {
discoveryLoopErrorRef.set(e);
} finally { // calling cancel will also let the fetcher loop escape
// (if not running, cancel() was already called)
if (running) {
cancel();
}
}
}
}, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());
discoveryLoopThread.start();
kafkaFetcher.runFetchLoop();
上面,就是 Flink 動態發現 Kafka 新增分區的過程。不過與 Spark 無需做任何配置不同的是,Flink 動態發現 Kafka 新增分區,這個功能需要被使能的。
也很簡單,需要將 flink.partition-discovery.interval-millis 該屬性設置為大於 0 即可。
容錯機制及處理語義
本節內容主要是想對比兩者在故障恢復及如何保證僅一次的處理語義。這個時候適合拋出一個問題:實時處理的時候,如何保證數據僅一次處理語義?
Spark Streaming 保證僅一次處理
對於 Spark Streaming 任務,我們可以設置 checkpoint,然後假如發生故障並重啟,我們可以從上次 checkpoint 之處恢復,但是這個行為只能使得數據不丟失,可能會重複處理,不能做到恰一次處理語義。
對於 Spark Streaming 與 Kafka 結合的 direct Stream 可以自己維護 offset 到 Zookeeper、Kafka 或任何其它外部系統,每次提交完結果之後再提交 offset,這樣故障恢復重啟可以利用上次提交的 offset 恢復,保證數據不丟失。
但是假如故障發生在提交結果之後、提交 offset 之前會導致數據多次處理,這個時候我們需要保證處理結果多次輸出不影響正常的業務。
由此可以分析,假設要保證數據恰一次處理語義,那麼結果輸出和 offset 提交必須在一個事務內完成。在這裡有以下兩種做法:
- repartition(1) Spark Streaming 輸出的 action 變成僅一個 partition,這樣可以利用事務去做:
Dstream.foreachRDD(rdd=>{
rdd.repartition(1).foreachPartition(partition=>{ // 開啟事務
partition.foreach(each=>{// 提交數據
}) // 提交事務
})
})
- 將結果和 offset 一起提交。
也就是結果數據包含 offset。這樣提交結果和提交 offset 就是一個操作完成,不會數據丟失,也不會重複處理。故障恢復的時候可以利用上次提交結果帶的 offset。
Flink 與 kafka 0.11 保證僅一次處理
若要 sink 支持僅一次語義,必須以事務的方式寫數據到 Kafka,這樣當提交事務時兩次 checkpoint 間的所有寫入操作作為一個事務被提交。這確保了出現故障或崩潰時這些寫入操作能夠被回滾。
在一個分佈式且含有多個併發執行 sink 的應用中,僅僅執行單次提交或回滾是不夠的,因為所有組件都必須對這些提交或回滾達成共識,這樣才能保證得到一致性的結果。
Flink 使用兩階段提交協議以及預提交(pre-commit)階段來解決這個問題。
本例中的 Flink 應用如圖 11 所示包含以下組件:
- 一個source,從Kafka中讀取數據(即KafkaConsumer);
- 一個時間窗口化的聚會操作;
- 一個sink,將結果寫回到Kafka(即KafkaProducer)。
下面詳細講解 Flink 的兩段提交思路:
如圖 12 所示,Flink checkpointing 開始時便進入到 pre-commit 階段。
具體來說,一旦 checkpoint 開始,Flink 的 JobManager 向輸入流中寫入一個 checkpoint barrier ,將流中所有消息分割成屬於本次 checkpoint 的消息以及屬於下次 checkpoint 的,barrier 也會在操作算子間流轉。
對於每個 operator 來說,該 barrier 會觸發 operator 狀態後端為該 operator 狀態打快照。
data source 保存了 Kafka 的 offset,之後把 checkpoint barrier 傳遞到後續的 operator。
這種方式僅適用於 operator 僅有它的內部狀態。內部狀態是指 Flink state backends 保存和管理的內容(如第二個 operator 中 window 聚合算出來的 sum)。
當一個進程僅有它的內部狀態的時候,除了在 checkpoint 之前將需要將數據更改寫入到 state backend,不需要在預提交階段做其他的動作。
在 checkpoint 成功的時候,Flink 會正確的提交這些寫入,在 checkpoint 失敗的時候會終止提交,過程可見圖 13。
當結合外部系統的時候,外部系統必須要支持可與兩階段提交協議捆綁使用的事務。
顯然本例中的 sink 由於引入了 kafka sink,因此在預提交階段 data sink 必須預提交外部事務。如下圖:
當 barrier 在所有的算子中傳遞一遍,並且觸發的快照寫入完成,預提交階段完成。
所有的觸發狀態快照都被視為 checkpoint 的一部分,也可以說 checkpoint 是整個應用程序的狀態快照,包括預提交外部狀態。出現故障可以從 checkpoint 恢復。
下一步就是通知所有的操作算子 checkpoint 成功。該階段 jobmanager 會為每個 operator 發起 checkpoint 已完成的回調邏輯。
本例中 data source 和窗口操作無外部狀態,因此該階段,這兩個算子無需執行任何邏輯,但是 data sink 是有外部狀態的,因此,此時我們必須提交外部事務,如下圖:
以上就是 Flink 實現恰一次處理的基本邏輯。
Back pressure
消費者消費的速度低於生產者生產的速度,為了使應用正常,消費者會反饋給生產者來調節生產者生產的速度,以使得消費者需要多少,生產者生產多少。(*back pressure 後面一律稱為背壓。)
Spark Streaming 的背壓
Spark Streaming 跟 Kafka 結合是存在背壓機制的,目標是根據當前 job 的處理情況來調節後續批次的獲取 Kafka 消息的條數。
為了達到這個目的,Spark Streaming 在原有的架構上加入了一個 RateController,利用的算法是 PID,需要的反饋數據是任務處理的結束時間、調度時間、處理時間、消息條數。
這些數據是通過 SparkListener 體系獲得,然後通過 PIDRateEsimator 的 compute 計算得到一個速率,進而可以計算得到一個 offset,然後跟限速設置最大消費條數比較得到一個最終要消費的消息最大 offset。
PIDRateEsimator 的 compute 方法如下:
def compute( time: Long, // in milliseconds
numElements: Long, processingDelay: Long, // in milliseconds
schedulingDelay: Long // in milliseconds
): Option[Double] = {
logTrace(s"\ntime = $time, # records = $numElements, " +
s"processing time = $processingDelay, scheduling delay = $schedulingDelay") this.synchronized { if (time > latestTime && numElements > 0 && processingDelay > 0) { val delaySinceUpdate = (time - latestTime).toDouble / 1000
val processingRate = numElements.toDouble / processingDelay * 1000
val error = latestRate - processingRate val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis // in elements/(second ^ 2)
val dError = (error - latestError) / delaySinceUpdate val newRate = (latestRate - proportional * error -
integral * historicalError -
derivative * dError).max(minRate)
logTrace(s""" | latestRate = $latestRate, error = $error | latestError = $latestError, historicalError = $historicalError | delaySinceUpdate = $delaySinceUpdate, dError = $dError """.stripMargin)
latestTime = time if (firstRun) {
latestRate = processingRate
latestError = 0D
firstRun = false
logTrace("First run, rate estimation skipped") None
} else {
latestRate = newRate
latestError = error
logTrace(s"New rate = $newRate") Some(newRate)
}
} else {
logTrace("Rate estimation skipped") None
}
}
}
Flink 的背壓
與 Spark Streaming 的背壓不同的是,Flink 背壓是 jobmanager 針對每一個 task 每 50ms 觸發 100 次 Thread.getStackTrace() 調用,求出阻塞的佔比。過程如圖 16 所示:
阻塞佔比在 web 上劃分了三個等級:
- OK: 0 <= Ratio <= 0.10,表示狀態良好;
- LOW: 0.10 < Ratio <= 0.5,表示有待觀察;
- HIGH: 0.5 < Ratio <= 1,表示要處理了。
“徵稿啦!”
CSDN 公眾號秉持著「與千萬技術人共成長」理念,不僅以「極客頭條」、「暢言」欄目在第一時間以技術人的獨特視角描述技術人關心的行業焦點事件,更有「技術頭條」專欄,深度解讀行業內的熱門技術與場景應用,讓所有的開發者緊跟技術潮流,保持警醒的技術嗅覺,對行業趨勢、技術有更為全面的認知。
如果你有優質的文章,或是行業熱點事件、技術趨勢的真知灼見,或是深度的應用實踐、場景方案等的新見解,歡迎聯繫 CSDN 投稿,聯繫方式:微信(guorui_1118,請備註投稿+姓名+公司職位),郵箱([email protected])。
閱讀更多 CSDN 的文章