Kafka 2.2.0基礎入門

Kafka 2.2.0基礎入門

編者: wRitchie(吳理琪) 來源:http://www.bj9420.com

簡介

Apache Kafka® 是 一個分佈式流處理平臺。

流處理平臺有以下三種特性:

1. 可以發佈和訂閱流式的記錄。這一方面與消息隊列或者企業消息系統類似。 
2. 可以儲存流式的記錄,並且有較好的容錯性。 
3. 可以在流式記錄產生時就進行處理。 

Kafka適合什麼樣的場景?

可以用於兩大類別的應用:

1. 構造實時流數據管道,它可以在系統或應用之間可靠地獲取數據。 (相當於message queue) 
2. 構建實時流式應用程序,對這些流數據進行轉換或者影響。 (就是流處理,通過kafka stream topic和topic之間內部進行變化) 

Kafka相關概念:

 Kafka作為一個集群,運行在一臺或者多臺服務器上. 
 Kafka 通過 topic 對存儲的流數據進行分類。 
 每條記錄中包含一個key,一個value和一個timestamp(時間戳)。

Kafka有四個核心的API:

The Producer API 允許一個應用程序發佈一串流式的數據到一個或者多個Kafka topic。 
The Consumer API 允許一個應用程序訂閱一個或多個topic ,並且對發佈給他們的流式數據進行處理。 
The Streams API 允許一個應用程序作為一個流處理器,消費一個或者多個topic產生的輸入流,然後生產一個輸出流到一個或多個topic中去,在輸入輸出流中進行有效的轉換。 
The Connector API 允許構建並運行可重用的生產者或者消費者,將Kafka topics連接到已存在的應用程序或者數據系統。比如,連接到一個關係型數據庫,捕捉表(table)的所有變更內容。
 

如下圖所示:

Kafka 2.2.0基礎入門

一、 zookeeper安裝

zookeeper是一個為分佈式應用提供一致性服務的軟件,它是開源的Hadoop項目的一個子項目,並根據google發表的一篇論文來實現的。zookeeper為分佈式系統提供了高效且易於使用的協同服務,它可以為分佈式應用提供相當多的服務,諸如統一命名服務,配置管理,狀態同步和組服務等。zookeeper接口簡單,不必過多地糾結在分佈式系統編程難於處理的同步和一致性問題上,可以使用zookeeper提供的現成(off-the-shelf)服務來實現來實現分佈式系統額配置管理,組管理,Leader選舉等功能。

1、下載zookeeper,到以下網址下載最新版本Zookeeper-3.5.5的安裝包apache-zookeeper-3.5.5-bin.tar.gz:

 https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.5/

注意:一定要下載含bin的tar.gz包,不然運行不起來。

2、安裝及配置zookeeper,將下載的apache-zookeeper-3.5.5-bin.tar.gz包上傳至Linux服務器/usr/local目錄下,解壓:

 tar -zxvf apache-zookeeper-3.5.5-bin.tar.gz

將解壓後的apache-zookeeper-3.5.5-bin重命名為:zookeeper-3.5.5,進入文件夾下的conf目錄,增加配置文件,即將conf/zoo_sample.cfg拷貝一份命名為zoo.cfg,也放在conf目錄下,使用默認的配置:

 # The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

3、啟動或停止zookeeper,進入bin目錄:

 cd /usr/local/zookeeper-3.5.5/bin

啟動zookeeper,執行如下命令:

 ./zkServer.sh start

執行啟動成功,顯示如下:

 ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

停止zookeeper,執行如下命令:

 ./zkServer.sh stop

執行停止成功,顯示下如:

 ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.5.5/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED

4、測試zookeeper是否正常工作,進入/usr/local/zookeeper-3.5.5/bin目錄下,執行以下命令:

 ./zkCli.sh -server127.0.0.1:2181

Linux控制檯顯示如下類似內容,表示zookeeper正常工作:

 2019-05-24 11:25:13,878 [myid:] - INFO [main:ClientCnxnSocket@237] - jute.maxbuffer value is 4194304 Bytes
2019-05-24 11:25:13,885 [myid:] - INFO [main:ClientCnxn@1653] - zookeeper.request.timeout value is 0. feature enabled=
Welcome to ZooKeeper!
2019-05-24 11:25:13,891 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1112] - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
JLine support is enabled
2019-05-24 11:25:13,943 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@959] - Socket connection established, initiating session, client: /0:0:0:0:0:0:0:1:48680, server: localhost/0:0:0:0:0:0:0:1:2181
[zk: localhost:2181(CONNECTING) 0] 2019-05-24 11:25:13,969 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1394] - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x10015424cbe0000, negotiated timeout = 30000
 
WATCHER::
 
WatchedEvent state:SyncConnected type:None path:null

二、 Kafka安裝

1、下載Kafka,到以下網址下載最新版本kafka-2.2.0的安裝包

 https://www.apache.org/dyn/closer.cgi?path=/kafka/2.2.0/kafka_2.12-2.2.0.tgz

2、安裝kafka,將下載的kafka_2.12-2.2.0.tgz包上傳至Linux服務器/usr/local目錄下,解壓:

 tar -xzf kafka_2.12-2.2.0.tgz

3、配置kafka,進入/usr/local/kafka_2.12-2.2.0/config目錄下,修改server.properties配置文件即可,本文使用默認配置文件,根據需要,可自行修改,默認端口為9092,發要修改,改成對應的服務IP及端口號即可:

 listeners=PLAINTEXT://:9092

4、啟動kafka服務器,進入解壓後目錄/usr/local/kafka_2.12-2.2.0/bin,命令如下:

 ./kafka-server-start.sh ../config/server.properties

結果如下:

./kafka-topics.sh --list --bootstrap-server localhost:9092
writchie

執行成功,顯示如下:

 [2019-05-24 12:34:38,396] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-05-24 12:34:38,397] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2019-05-24 12:34:38,398] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-05-24 12:34:38,438] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2019-05-24 12:34:38,446] INFO [SocketServer brokerId=0] Started data-plane processors for 1 acceptors (kafka.network.SocketServer)
[2019-05-24 12:34:38,451] INFO Kafka version: 2.2.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-05-24 12:34:38,451] INFO Kafka commitId: 05fcfde8f69b0349 (org.apache.kafka.common.utils.AppInfoParser)
[2019-05-24 12:34:38,452] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

5、創建Topic:創建一個名叫writchie的topic,命令如下:

 ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic writchie

6、查看topic情況:

 ./kafka-topics.sh --list --bootstrap-server localhost:9092

7、創建發送者,即向以writchie為topic發送消息,命令如下:

 ./kafka-console-producer.sh --broker-list localhost:9092 --topic writchie
 >first kafka message from wRitchie
 >second kafka message from wRitchie

注意:若要退出發送消息,按 Ctrl+c鍵退出。

8、創建消費者,命令如下:

 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic writchie --from-beginning

執行完後,顯示如下:

 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic writchie --from-beginning
first kafka message from wRitchie
second kafka message from wRitchie

注意:若要退出接收消息,按 Ctrl+c鍵退出。

9、在Linux系統的兩個不同終端中運行創建發送者和接收者的命令,若在發送者終端發送完消息,則在消費者終端能實時顯示接收到的消息,如下圖所示:

發送者終端:

Kafka 2.2.0基礎入門

消費者終端:

Kafka 2.2.0基礎入門

10、設置多代理集群:到目前為止,一直在與一個代理競爭,但這並不好玩。對於Kafka來說,單個代理只是一個大小為1的集群,因此除了啟動一些代理實例之外沒有太多變化。但是為了感受它,將集群擴展到三個節點(仍然在本地機器上)。

1、為每個代理創建一個配置文件(在Windows上使用copy命令),以下操作在/usr/local/kafka_2.12-2.2.0目錄下進行,命令如下:

# cp config/server.properties config/server-1.properties
# cp config/server.properties config/server-2.properties

2、編輯server-1.properties、server-2.properties文件,修改內容如下:

 config/server-1.properties:
 broker.id=1
 listeners=PLAINTEXT://:9093
 log.dirs=/tmp/kafka-logs-1
 
config/server-2.properties:
 broker.id=2
 listeners=PLAINTEXT://:9094
 log.dirs=/tmp/kafka-logs-2

具體參見下圖:

Kafka 2.2.0基礎入門

注:broker.id屬性是群集中每個節點的唯一且永久的名稱。必須覆蓋端口和日誌目錄,因為在同一臺機器上運行這些,並且希望讓所有代理嘗試在同一端口上註冊或覆蓋彼此的數據。

3、已經啟動了Zookeeper,並啟動了單個節點,因此只需要啟動兩個新節點,打開兩個新的Linux終端,定位到/usr/local/kafka_2.12-2.2.0目錄,運行命令如下:

 bin/kafka-server-start.sh config/server-1.properties &
 bin/kafka-server-start.sh config/server-2.properties &

4、創建一個副本為3的新topic:writchie-replicated-topic,在/usr/local/kafka_2.12-2.2.0目錄,運行命令如下:

 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic writchie-replicated-topic

5、現在是一個集群,怎麼才能知道那些Broker在做什麼呢?運行“describe topics”命令來查看:

 bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic writchie-replicated-topic

執行完後如下圖:

Kafka 2.2.0基礎入門

以下是對輸出信息的解釋。第一行給出了所有分區的摘要,下面的每行都給出了一個分區的信息。因為只有一個分區,所以只有一行。

Ø “leader”是負責給定分區所有讀寫操作的節點。每個節點都是隨機選擇的部分分區的領導者。 
Ø “replicas”是複製分區日誌的節點列表,不管這些節點是leader還是僅僅活著。 
Ø “isr”是一組“同步”replicas,是replicas列表的子集,它活著並被指到leader。 

注意:在示例中,節點1是該主題中唯一分區的領導者。可以在已創建的原始主題上運行相同的命令來查看它的位置,命令如下:

 ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic writchie

執行完後,如下圖所示:

Kafka 2.2.0基礎入門

這沒什麼大不了,原來的主題沒有副本且在服務器0上。創建集群時,這是唯一的服務器。

6、下面發表一些信息給新topic:

./kafka-console-producer.sh --broker-list localhost:9092 --topic writchie-replicated-topic

執行完如下圖:

Kafka 2.2.0基礎入門

7、消費這些新消息:

 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic writchie-replicated-topic

執行完如下圖:

Kafka 2.2.0基礎入門

8、測試一下容錯性。 Broker 1 現在是 leader,讓我們來殺了它:

 ps aux | grep server-1.properties

Linux終端中使用kill -9 進程ID,或者在啟動server-1.properties的終端按Ctrl+c停止節點1。

領導權已經切換到一個從屬節點,而且節點1也不在同步副本集中了:

 ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic writchie-replicated-topic

不過,即便原先寫入消息的leader已經不在,這些消息仍可用於消費:

 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic writchie-replicated-topic

執行結果如下圖:

Kafka 2.2.0基礎入門

至此,kafka基礎入門結束。

附:

問題一: kafka報錯,提示Error: Could not find or load main class kafka.Kafka

解決:下載的文件的是src-tgz文件,改下載二進制tgz文件,參見下圖

Kafka 2.2.0基礎入門


分享到:


相關文章: