大數據乾貨分享:Spark Streaming 數據清理機制

大數據乾貨分享:Spark Streaming 數據清理機制

DStream 和 RDD

我們知道Spark Streaming 計算還是基於Spark Core的,Spark Core 的核心又是RDD. 所以Spark Streaming 肯定也要和RDD扯上關係。然而Spark Streaming 並沒有直接讓用戶使用RDD而是自己抽象了一套DStream的概念。 DStream 和 RDD 是包含的關係,你可以理解為Java裡的裝飾模式,也就是DStream 是對RDD的增強,但是行為表現和RDD是基本上差不多的。都具備幾個條件:

  • 具有類似的tranformation動作,比如map,reduceByKey等,也有一些自己獨有的,比如Window,mapWithStated等
  • 都具有Action動作,比如foreachRDD,count等

從編程模型上看是一致的。

所以很可能你寫的那堆Spark Streaming代碼看起來好像和Spark 一致的,然而並不能直接複用,因為一個是DStream的變換,一個是RDD的變化。

Spark Streaming中 DStream 介紹

DStream 下面包含幾個類:

  • 數據源類,比如InputDStream,具體如DirectKafkaInputStream等
  • 轉換類,典型比如MappedDStream,ShuffledDStream
  • 輸出類,典型比如ForEachDStream

從上面來看,數據從開始(輸入)到結束(輸出)都是DStream體系來完成的,也就意味著用戶正常情況是無法直接去產生和操作RDD的,這也就是說,DStream有機會和義務去負責RDD的生命週期。

這就回答了前言中的問題了。Spark Streaming具備自動清理功能。

大數據乾貨分享:Spark Streaming 數據清理機制


RDD 在Spark Stream中產生的流程

在Spark Streaming中RDD的生命流程大體如下:

在Spark Streaming中RDD的生命流程大體如下:

  • 在InputDStream會將接受到的數據轉化成RDD,比如DirectKafkaInputStream 產生的就是 KafkaRDD
  • 接著通過MappedDStream等進行數據轉換,這個時候是直接調用RDD對應的map方法進行轉換的
  • 在進行輸出類操作時,才暴露出RDD,可以讓用戶執行相應的存儲,其他計算等操作。


我們這裡就以下面的代碼來進行更詳細的解釋:

val source = KafkaUtils.createDirectInputStream(....)
source.map(....).foreachRDD{rdd=>
rdd.saveTextFile(....)
}

foreachRDD 產生ForEachDStream,因為foreachRDD是個Action,所以會觸發任務的執行,會被調用generateJob方法。

override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}

對應的parent是MappedDStream,也就是說調用MappedDStream.getOrCompute.該方法在DStream中,首先會在MappedDStream對象中的generatedRDDs 變量中查找是否已經有RDD,如果沒有則觸發計算,並且將產生的RDD放到generatedRDDs

@transientprivate[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// If RDD was already generated, then retrieve it from HashMap,
// or else compute the RDD
generatedRDDs.get(time).orElse {
....
generatedRDDs.put(time, newRDD)
....

計算RDD是調用的compute方法,MappedDStream 的compute方法很簡單,直接調用的父類也就是DirectKafkaInputStream的getOrCompute方法:

override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
}

在上面的例子中,MappedDStream 的parent是DirectKafkaInputStream中,這是個數據源,所以他的compute方法會直接new出一個RDD.

在上面的例子中,MappedDStream 的parent是DirectKafkaInputStream中,這是個數據源,所以他的compute方法會直接new出一個RDD.

從上面可以得出幾個結論:

  • 數據源以及轉換類DStream都會維護一個generatedRDDs,可以按batchTime 進行獲取
  • 內部本質還是進行的RDD的轉換
大數據乾貨分享:Spark Streaming 數據清理機制


如果我們調用了cache會發生什麼

這裡又會有兩種情況,一種是調用DStream.cache,第二種是RDD.cache。事實上他們是完全一樣的。DStream的cache 動作只是將DStream的變量storageLevel 設置為MEMORY_ONLY_SER,然後在產生(或者獲取)RDD的時候,調用RDD的persit方法進行設置。所以DStream.cache 產生的效果等價於RDD.cache(也就是你自己調用foreachRDD 將RDD 都設置一遍)

進入正題,我們是怎麼釋放Cache住的RDD的

其實無所謂Cache不Cache住,RDD最終都是要釋放的,否則運行久了,光RDD對象也能承包了你的內存。我們知道,在Spark Streaming中,週期性產生事件驅動Spark Streaming 的類其實是:

org.apache.spark.streaming.scheduler.JobGenerator

他內部有個永動機(定時器),定時發佈一個產生任務的事件:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

然後通過processEvent進行事件處理:

 
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}

}

目前我們只關注ClearMetadata 事件。對應的方法為:

private def clearMetadata(time: Time) {
ssc.graph.clearMetadata(time)
// If checkpointing is enabled, then checkpoint,
// else mark batch to be fully processed
if (shouldCheckpoint) {
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
} else {
// If checkpointing is not enabled, then delete metadata information about
// received blocks (block data not saved in any case). Otherwise, wait for
// checkpointing of this batch to complete.
val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
markBatchFullyProcessed(time)
}
}

首先是清理輸出DStream(比如ForeachDStream),接著是清理輸入類(基於Receiver模式)的數據。ForeachDStream 其實調用的也是DStream的方法。該方法大體如下:

private[streaming] def clearMetadata(time: Time) {
val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
logDebug("Clearing references to old RDDs: [" +
oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
generatedRDDs --= oldRDDs.keys
if (unpersistData) {
logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", "))
oldRDDs.values.foreach { rdd =>
rdd.unpersist(false)
rdd match {
case b: BlockRDD[_] =>
logInfo("Removing blocks of RDD " + b + " of time " + time)
b.removeBlocks()
case _ =>
}
}
}
logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +

(time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
dependencies.foreach(_.clearMetadata(time))
}

大體執行動作如下描述:

  • 根據記憶週期得到應該剔除的RDD
  • 根據是否要清理cache數據,進行unpersit 操作,並且顯示的移除block
  • 根據依賴調用其他的DStream進行動作清理

這裡我們還可以看到,通過參數spark.streaming.unpersist 你是可以決定是否手工控制是否需要對cache住的數據進行清理。

這裡你會有兩個疑問:

  • dependencies 是什麼?
  • rememberDuration 是怎麼來的?

dependencies 你可以簡單理解為父DStream,通過dependencies 我們可以獲得已完整DStream鏈。rememberDuration 的設置略微複雜些,大體是 slideDuration,如果設置了checkpointDuration 則是2*checkpointDuration 或者通過DStreamGraph.rememberDuration(如果設置了的話,譬如通過StreamingContext.remember方法,不過通過該方法設置的值要大於計算得到的值會生效)

如果你用了window* 相關的操作,則在此之前的DStream 的rememberDuration 都需要加上windowDuration。

然後根據Spark Streaming的定時性,每個週期只要完成了,都會觸發清理動作,這個就是清理動作發生的時機。代碼如下:

def onBatchCompletion(time: Time) { 
eventLoop.post(ClearMetadata(time))
}


總結

Spark Streaming 會在每個Batch任務結束時進行一次清理動作。每個DStream 都會被掃描,不同的DStream根據情況不同,保留的RDD數量也是不一致的,但都是根據rememberDuration變量決定,而該變量會被下游的DStream所影響,所以不同的DStream的rememberDuration取值是不一樣的。

最後,想要學習更多spark,大數據知識或者想獲取更多相關學習資料。

轉發關注後,後臺私信我【資料】即可獲得資料免費領取方式!

大數據乾貨分享:Spark Streaming 數據清理機制

大數據乾貨分享:Spark Streaming 數據清理機制


分享到:


相關文章: