SparkStreaming集成kafka實現數據零丟失

在開發中,我們看到SparkStreaming提供的api嚴格上無沒法滿足真實生產環境的需求的,因為不會記錄消費kafka的offset,所以首先了解下,然後做出相應的調整,讓自己的系統更加穩定高效,防止重複消費或是丟失數據。

sparkStreaming 提供 的api介紹如下,具體的實現方式也以代碼的方式奉上。

1.createDirect

數據0丟失,但是會寫Write Ahead Logs, 效率不高,基本上工作中不會用。

2.createDirectStream 【敲黑板】

官網上對這個新接口的介紹很多,大致就是不與zookeeper交互,直接去kafka中讀取數據,自己維護offset,於是速度比KafkaUtils.createStream要快上很多。但有利就有弊:無法進行offset的監控。

優點:

簡單化的並行:一對一的映射kafka和rdd的分區。

效率:不寫Ahead Logs

一次語義:createDirect方法是使用kafka的高級api去去存儲offset到zookeeper,確保數據0丟失,但是有一些情況下,會導致重複消費這是因為spark streaming 接收數據可靠性與zk跟蹤的offset不一致導致的。 今後,方法二,使用kafka 簡單API 不使用zookeeper跟蹤offset,只使用spark streaming 內部的checkpoints,排除了streaming和zk的不一致。所以每個記錄被Spark Streaming有效地精確接收一次,儘管失敗。

項目中需要嘗試使用這個接口,同時還要進行offset的監控,於是只能按照官網所說的,自己將offset寫入zookeeper。

具體api:

方法1:

def createDirectStream[

K: ClassTag,

V: ClassTag,

KD <: decoder="" classtag="">

VD <: decoder="" classtag="">

ssc: StreamingContext,

kafkaParams: Map[String, String],

topics: Set[String]

): InputDStream[(K, V)] {...}

這個方法只有3個參數,使用起來最為方便,但是每次啟動的時候默認從Latest offset開始讀取,或者設置參數auto.offset.reset="smallest"後將會從Earliest offset開始讀取。

顯然這2種讀取位置都不適合生產環境。

方法2:

def createDirectStream[

K: ClassTag,

V: ClassTag,

KD <: decoder="" classtag="">

VD <: decoder="" classtag="">

R: ClassTag] (

ssc: StreamingContext,

kafkaParams: Map[String, String],

fromOffsets: Map[TopicAndPartition, Long],

messageHandler: MessageAndMetadata[K, V] => R

): InputDStream[R] = {...}

這個方法可以在啟動的時候可以設置offset,但參數設置起來複雜很多,首先是fromOffsets: Map[TopicAndPartition, Long]的設置。

代碼實現在這裡:github 項目:easySpark/SparkStreaming


分享到:


相關文章: