超詳細!一文詳解 SparkStreaming 如何整合 Kafka!附代碼可實踐

超詳細!一文詳解 SparkStreaming 如何整合 Kafka!附代碼可實踐

來源 | Alice菌

責編 | Carol

封圖 | CSDN 下載於視覺中國

出品 | CSDN(ID:CSDNnews)

相信很多小夥伴已經接觸過 SparkStreaming 了,理論就不講太多了,今天的內容主要是為大家帶來的是 SparkStreaming 整合 Kafka 的教程。

文中含代碼,感興趣的朋友可以複製動手試試!

超詳細!一文詳解 SparkStreaming 如何整合 Kafka!附代碼可實踐

Kafka回顧

正式開始之前,先讓我們來對Kafka回顧一波。

  • 核心概念圖解

Broker:安裝Kafka服務的機器就是一個broker

超詳細!一文詳解 SparkStreaming 如何整合 Kafka!附代碼可實踐

Producer:消息的生產者,負責將數據寫入到broker中(push)

Consumer:消息的消費者,負責從kafka中拉取數據(pull),老版本的消費者需要依賴zk,新版本的不需要Topic: 主題,相當於是數據的一個分類,不同topic存放不同業務的數據 –主題:區分業務Replication:副本,數據保存多少份(保證數據不丟失) –副本:數據安全Partition:分區,是一個物理的分區,一個分區就是一個文件,一個Topic可以有1~n個分區,每個分區都有自己的副本 –分區:併發讀寫Consumer Group:消費者組,一個topic可以有多個消費者/組同時消費,多個消費者如果在一個消費者組中,那麼他們不能重複消費數據 –消費者組:提高消費者消費速度、方便統一管理注意[1]:一個Topic可以被多個消費者或者組訂閱,一個消費者/組也可以訂閱多個主題注意[2]:讀數據只能從Leader讀, 寫數據也只能往Leader寫,Follower會從Leader那裡同步數據過來做副本!!!

  • 常用命令

啟動kafka

/export/servers/kafka/bin/kafka-server-start.sh -daemon 

/export/servers/kafka/config/server.properties

停止kafka

/export/servers/kafka/bin/kafka-server-stop.sh

查看topic信息

/export/servers/kafka/bin/kafka-topics.sh --list --zookeeper node01:2181

創建topic

/export/servers/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 3 --topic test

查看某個topic信息

/export/servers/kafka/bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test

刪除topic

/export/servers/kafka/bin/kafka-topics.sh --zookeeper node01:2181 --delete --topic test

啟動生產者–控制檯的生產者一般用於測試

/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092 --topic spark_kafka

啟動消費者–控制檯的消費者一般用於測試

/export/servers/kafka/bin/kafka-console-consumer.sh --zookeeper node01:2181 --topic spark_kafka--from-beginning

消費者連接到borker的地址

/export/servers/kafka/bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic spark_kafka --from-beginning 

超詳細!一文詳解 SparkStreaming 如何整合 Kafka!附代碼可實踐

整合kafka兩種模式說明

這同時也是一個面試題的熱點。

開發中我們經常會利用SparkStreaming實時地讀取kafka中的數據然後進行處理,在spark1.3版本後,kafkaUtils裡面提供了兩種創建DStream的方法:

1、Receiver接收方式:

  • KafkaUtils.createDstream(開發中不用,瞭解即可,但是面試可能會問)。
  • Receiver作為常駐的Task運行在Executor等待數據,但是一個Receiver效率低,需要開啟多個,再手動合併數據(union),再進行處理,很麻煩

  • Receiver哪臺機器掛了,可能會丟失數據,所以需要開啟WAL(預寫日誌)保證數據安全,那麼效率又會降低!

  • Receiver方式是通過zookeeper來連接kafka隊列,調用Kafka高階API,offset存儲在zookeeper,由Receiver維護。

  • spark在消費的時候為了保證數據不丟也會在Checkpoint中存一份offset,可能會出現數據不一致

  • 所以不管從何種角度來說,Receiver模式都不適合在開發中使用了,已經淘汰了

2、Direct直連方式

  • KafkaUtils.createDirectStream(開發中使用,要求掌握)
  • Direct方式是直接連接kafka分區來獲取數據,從每個分區直接讀取數據大大提高了並行能力
  • Direct方式調用Kafka低階API(底層API),offset自己存儲和維護,默認由Spark維護在checkpoint中,消除了與zk不一致的情況
  • 當然也可以自己手動維護,把offset存在mysql、redis中
  • 所以基於Direct模式可以在開發中使用,且藉助Direct模式的特點+手動操作可以保證數據的Exactly once 精準一次

總結:

  • Receiver接收方式
  1. 多個Receiver接受數據效率高,但有丟失數據的風險

  2. 開啟日誌(WAL)可防止數據丟失,但寫兩遍數據效率低。

  3. Zookeeper維護offset有重複消費數據可能。

  4. 使用高層次的API

  • Direct直連方式
  1. 不使用Receiver,直接到kafka分區中讀取數據

  2. 不使用日誌(WAL)機制

  3. Spark自己維護offset

  4. 使用低層次的API

擴展:關於消息語義

超詳細!一文詳解 SparkStreaming 如何整合 Kafka!附代碼可實踐

注意:

開發中SparkStreaming和kafka集成有兩個版本:0.8及0.10+

0.8版本有Receiver和Direct模式(但是0.8版本生產環境問題較多,在Spark2.3之後不支持0.8版本了)。

0.10以後只保留了direct模式(Reveiver模式不適合生產環境),並且0.10版本API有變化(更加強大)

超詳細!一文詳解 SparkStreaming 如何整合 Kafka!附代碼可實踐

結論:

我們學習和開發都直接使用0.10版本中的direct模式,但是關於Receiver和Direct的區別面試的時候要能夠答得上來

超詳細!一文詳解 SparkStreaming 如何整合 Kafka!附代碼可實踐

spark-streaming-kafka-0-8(瞭解)

1.Receiver

KafkaUtils.createDstream使用了receivers來接收數據,利用的是Kafka高層次的消費者api,偏移量由Receiver維護在zk中,對於所有的receivers接收到的數據將會保存在Spark executors中,然後通過Spark Streaming啟動job來處理這些數據,默認會丟失,可啟用WAL日誌,它同步將接受到數據保存到分佈式文件系統上比如HDFS。保證數據在出錯的情況下可以恢復出來。儘管這種方式配合著WAL機制可以保證數據零丟失的高可靠性,但是啟用了WAL效率會較低,且無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。

(官方現在已經不推薦這種整合方式。)

超詳細!一文詳解 SparkStreaming 如何整合 Kafka!附代碼可實踐
  • 準備工作

1)啟動zookeeper集群

zkServer.sh start

2)啟動kafka集群

kafka-server-start.sh /export/servers/kafka/config/server.properties

3.創建topic

kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 3 --topic spark_kafka

4.通過shell命令向topic發送消息

kafka-console-producer.sh --broker-list node01:9092 --topic spark_kafka

5.添加kafka的pom依賴


org.apache.spark
spark-streaming-kafka-0-8_2.11
2.2.0
  • API

通過receiver接收器獲取kafka中topic數據,可以並行運行更多的接收器讀取kafak topic中的數據,這裡為3個

 val receiverDStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
stream
})

如果啟用了WAL(spark.streaming.receiver.writeAheadLog.enable=true)可以設置存儲級別(默認StorageLevel.MEMORY_AND_DISK_SER_2)

代碼演示

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.immutable

object SparkKafka {

def main(args: Array[String]): Unit = {
//1.創建StreamingContext
val config: SparkConf =
new SparkConf.setAppName("SparkStream").setMaster("local[*]")
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
//開啟WAL預寫日誌,保證數據源端可靠性
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint("./kafka")
//==============================================
//2.準備配置參數
val zkQuorum = "node01:2181,node02:2181,node03:2181"
val groupId = "spark"
val topics = Map("spark_kafka" -> 2)//2表示每一個topic對應分區都採用2個線程去消費,
//ssc的rdd分區和kafka的topic分區不一樣,增加消費線程數,並不增加spark的並行處理數據數量
//3.通過receiver接收器獲取kafka中topic數據,可以並行運行更多的接收器讀取kafak topic中的數據,這裡為3個
val receiverDStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
stream
})
//4.使用union方法,將所有receiver接受器產生的Dstream進行合併
val allDStream: DStream[(String, String)] = ssc.union(receiverDStream)
//5.獲取topic的數據(String, String) 第1個String表示topic的名稱,第2個String表示topic的數據
val data: DStream[String] = allDStream.map(_._2)
//==============================================
//6.WordCount
val words: DStream[String] = data.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
result.print
ssc.start
ssc.awaitTermination
}
}

2.Direct

Direct方式會定期地從kafka的topic下對應的partition中查詢最新的偏移量,再根據偏移量範圍在每個batch裡面處理數據,Spark通過調用kafka簡單的消費者API讀取一定範圍的數據。

超詳細!一文詳解 SparkStreaming 如何整合 Kafka!附代碼可實踐
  • Direct的缺點是無法使用基於zookeeper的kafka監控工具

  • Direct相比基於Receiver方式有幾個優點:

  1. 簡化並行

    不需要創建多個kafka輸入流,然後union它們,sparkStreaming將會創建和kafka分區數一樣的rdd的分區數,而且會從kafka中並行讀取數據,spark中RDD的分區數和kafka中的分區數據是一一對應的關係。

  2. 高效

    Receiver實現數據的零丟失是將數據預先保存在WAL中,會複製一遍數據,會導致數據被拷貝兩次,第一次是被kafka複製,另一次是寫到WAL中。而Direct不使用WAL消除了這個問題。

  3. 恰好一次語義(Exactly-once-semantics)

    Receiver讀取kafka數據是通過kafka高層次api把偏移量寫入zookeeper中,雖然這種方法可以通過數據保存在WAL中保證數據不丟失,但是可能會因為sparkStreaming和ZK中保存的偏移量不一致而導致數據被消費了多次。

Direct的Exactly-once-semantics(EOS)通過實現kafka低層次api,偏移量僅僅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的問題。

  • API
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

代碼演示

import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}


object SparkKafka2 {
def main(args: Array[String]): Unit = {
//1.創建StreamingContext
val config: SparkConf =
new SparkConf.setAppName("SparkStream").setMaster("local[*]")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint("./kafka")
//==============================================
//2.準備配置參數
val kafkaParams = Map("metadata.broker.list" -> "node01:9092,node02:9092,node03:9092", "group.id" -> "spark")
val topics = Set("spark_kafka")
val allDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
//3.獲取topic的數據
val data: DStream[String] = allDStream.map(_._2)
//==============================================
//WordCount
val words: DStream[String] = data.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
result.print
ssc.start
ssc.awaitTermination
}
}
超詳細!一文詳解 SparkStreaming 如何整合 Kafka!附代碼可實踐

spark-streaming-kafka-0-10

  • 說明

spark-streaming-kafka-0-10版本中,API有一定的變化,操作更加靈活,開發中使用

  • pom.xml


org.apache.spark
spark-streaming-kafka-0-10_2.11
${spark.version}
  • API:

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

  • 創建topic
/export/servers/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 3 --topic spark_kafka
  • 啟動生產者

/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092,node01:9092,node01:9092 --topic spark_kafka 

  • 代碼演示

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object SparkKafkaDemo {
def main(args: Array[String]): Unit = {
//1.創建StreamingContext
//spark.master should be set as local[n], n > 1
val conf = new SparkConf.setAppName("wc").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc,Seconds(5))//5表示5秒中對數據進行切分形成一個RDD
//準備連接Kafka的參數
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "SparkKafkaDemo",
//earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
//latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
//none:topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常
//這裡配置latest自動重置偏移量為最新的偏移量,即如果有偏移量從偏移量位置開始消費,沒有偏移量從新來的數據開始消費

"auto.offset.reset" -> "latest",
//false表示關閉自動提交.由spark幫你提交到Checkpoint或程序員手動維護
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("spark_kafka")
//2.使用KafkaUtil連接Kafak獲取數據
val recordDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,//位置策略,源碼強烈推薦使用該策略,會讓Spark的Executor和Kafka的Broker均勻對應
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))//消費策略,源碼強烈推薦使用該策略
//3.獲取VALUE數據
val lineDStream: DStream[String] = recordDStream.map(_.value)//_指的是ConsumerRecord
val wrodDStream: DStream[String] = lineDStream.flatMap(_.split(" ")) //_指的是發過來的value,即一行數據
val wordAndOneDStream: DStream[(String, Int)] = wrodDStream.map((_,1))
val result: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_)
result.print
ssc.start//開啟
ssc.awaitTermination//等待優雅停止
}
}

好了,本篇主要講解的 SparkStreaming 整合 Kafka 的過程,並帶大家複習了一波Kafka的基礎知識,如果對你有用的話,麻煩動手手點個“在看”吧~

超詳細!一文詳解 SparkStreaming 如何整合 Kafka!附代碼可實踐

本文由作者首發 CSDN 博客,原文鏈接:

https://blog.csdn.net/weixin_44318830/article/details/105612516

超詳細!一文詳解 SparkStreaming 如何整合 Kafka!附代碼可實踐

☞開源激盪 30 年:從免費社區到價值數十億美元公司

☞理解 AI 最偉大的成就之一:卷積神經網絡的侷限性

☞GitHub 標星 10,000+,Apache 頂級項目 ShardingSphere 的開源之路

☞港科大鄭光廷院士問診未來,揭露 AI 最新應用與實踐

☞大促下的智能運維挑戰:阿里如何抗住“雙11貓晚”?

☞以太坊2.0中的Custody Game及MPC實現

☞很用心的為你寫了9道MySQL面試題,建議收藏!


分享到:


相關文章: