Spark Streaming的容錯

Spark Streaming的容錯

Spark Streaming類似於Apache Storm,用於流式數據的處理。根據其官方文檔介紹,Spark Streaming有高吞吐量和容錯能力強這兩個特點。Spark Streaming支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。數據輸入後可以用 Spark的高度抽象原語如:map、reduce、join、window等進行運算。而結果也能保存在很多地方,如HDFS,數據庫等。另外 Spark Streaming也能和MLlib(機器學習)以及Graphx完美融合。

Spark Streaming 為了實現容錯特性,接收到的數據需要在集群的多個Worker 節點上的 executors 之間保存副本(默認2份)。當故障發生時,有兩種數據需要恢復:

  • 已接收並且有副本的數據。當只有一臺worker 發生故障時,這些數據不會丟失。

  • 已接收但還沒有副本的數據。只能從數據源重新獲取。

我們需要考慮兩種發生故障的情況:

  • Worker 節點故障。如果receiver 運行在發生故障的worker 上,緩存的數據將丟失。

  • Driver 發生故障。很顯然 SparkContext 將會丟失,所有executors連同其內存中的數據將會丟失。

瞭解容錯之前,需要知道的數據處理的類型:

  • 最多一次。數據被處理一次或沒被處理

  • 至少一次。數據被處理一次或多次

  • 僅有一次。有且僅有一次

可以看出,僅有一次是我們需要達到的目標。

Spark Streaming 數據處理的三個步驟:

  • 接收數據。

  • 處理數據。

  • 輸出數據。最終結果被髮送到外部系統。如FileSystem,Database等。

Spark Streaming想要保證數據僅有一次被處理,以上三個步驟均需要保證僅有一次被處理。

1、接收數據。 不同的數據來源有不同的保證。

b、數據來源基於Receiver。容錯將取決於失敗的類型和Receiver的類型。有以下兩種Receiver:

  • 可靠的Receiver。Receiver將會在把接收到的數據保存副本後和Source確認已收到數據。如果此類Receiver發生故障,那麼Source將接收不到確認信息。Receiver重啟後,Source會繼續發送未被確認的信息。

  • 不可靠的Receiver。不會發送確認信息:

如果Worker 發生故障,對於(a)數據不會丟失。對於(b)沒有副本的數據會丟失。

如果Driver 發生故障,所有之前收到的數據都會丟失,這將影響有狀態的操作。

為了解決上述丟失問題,Spark1.2 開始建議使用“write ahead logs” 機制,但是也只能保證“至少處理一次”。

2、處理數據。Spark Streaming 內部RDD保證“僅被處理一次”。

3、輸出數據。默認保證“至少處理一次”。因為它取決於最終結果的操作類型和下游的系統(是否支持事務)。

當worker 發生故障時,輸出操作可能會被執行多次。想要保證“僅被處理一次”,有以下兩種方式:

  • 等價更新。如:輸出操作是 saveAs***Files 操作時,因為寫文件會直接覆蓋原來的文件。

  • 事務更新。使輸出的更新操作都具有事務。

a、使用 batch time (存在於foreachRDD中) 和 RDD 的 partition index 組成唯一標識

b、下游系統使用(a)中唯一標識來判斷此數據是否被處理過。

dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // use this uniqueId to transactionally commit the data in partitionIterator }}


分享到:


相關文章: