Kafka 2.2.0基礎入門
編者: wRitchie(吳理琪) 來源:http://www.bj9420.com
簡介
Apache Kafka® 是 一個分佈式流處理平臺。
流處理平臺有以下三種特性:
1. 可以發佈和訂閱流式的記錄。這一方面與消息隊列或者企業消息系統類似。 2. 可以儲存流式的記錄,並且有較好的容錯性。 3. 可以在流式記錄產生時就進行處理。
Kafka適合什麼樣的場景?
可以用於兩大類別的應用:
1. 構造實時流數據管道,它可以在系統或應用之間可靠地獲取數據。 (相當於message queue) 2. 構建實時流式應用程序,對這些流數據進行轉換或者影響。 (就是流處理,通過kafka stream topic和topic之間內部進行變化)
Kafka相關概念:
Kafka作為一個集群,運行在一臺或者多臺服務器上. Kafka 通過 topic 對存儲的流數據進行分類。 每條記錄中包含一個key,一個value和一個timestamp(時間戳)。
Kafka有四個核心的API:
The Producer API 允許一個應用程序發佈一串流式的數據到一個或者多個Kafka topic。 The Consumer API 允許一個應用程序訂閱一個或多個topic ,並且對發佈給他們的流式數據進行處理。 The Streams API 允許一個應用程序作為一個流處理器,消費一個或者多個topic產生的輸入流,然後生產一個輸出流到一個或多個topic中去,在輸入輸出流中進行有效的轉換。 The Connector API 允許構建並運行可重用的生產者或者消費者,將Kafka topics連接到已存在的應用程序或者數據系統。比如,連接到一個關係型數據庫,捕捉表(table)的所有變更內容。
如下圖所示:
一、 zookeeper安裝
zookeeper是一個為分佈式應用提供一致性服務的軟件,它是開源的Hadoop項目的一個子項目,並根據google發表的一篇論文來實現的。zookeeper為分佈式系統提供了高效且易於使用的協同服務,它可以為分佈式應用提供相當多的服務,諸如統一命名服務,配置管理,狀態同步和組服務等。zookeeper接口簡單,不必過多地糾結在分佈式系統編程難於處理的同步和一致性問題上,可以使用zookeeper提供的現成(off-the-shelf)服務來實現來實現分佈式系統額配置管理,組管理,Leader選舉等功能。
1、下載zookeeper,到以下網址下載最新版本Zookeeper-3.5.5的安裝包apache-zookeeper-3.5.5-bin.tar.gz:
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.5/
注意:一定要下載含bin的tar.gz包,不然運行不起來。
2、安裝及配置zookeeper,將下載的apache-zookeeper-3.5.5-bin.tar.gz包上傳至Linux服務器/usr/local目錄下,解壓:
tar -zxvf apache-zookeeper-3.5.5-bin.tar.gz
將解壓後的apache-zookeeper-3.5.5-bin重命名為:zookeeper-3.5.5,進入文件夾下的conf目錄,增加配置文件,即將conf/zoo_sample.cfg拷貝一份命名為zoo.cfg,也放在conf目錄下,使用默認的配置:
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/tmp/zookeeper # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1
3、啟動或停止zookeeper,進入bin目錄:
cd /usr/local/zookeeper-3.5.5/bin
啟動zookeeper,執行如下命令:
./zkServer.sh start
執行啟動成功,顯示如下:
ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper-3.5.5/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
停止zookeeper,執行如下命令:
./zkServer.sh stop
執行停止成功,顯示下如:
ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper-3.5.5/bin/../conf/zoo.cfg Stopping zookeeper ... STOPPED
4、測試zookeeper是否正常工作,進入/usr/local/zookeeper-3.5.5/bin目錄下,執行以下命令:
./zkCli.sh -server127.0.0.1:2181
Linux控制檯顯示如下類似內容,表示zookeeper正常工作:
2019-05-24 11:25:13,878 [myid:] - INFO [main:ClientCnxnSocket@237] - jute.maxbuffer value is 4194304 Bytes 2019-05-24 11:25:13,885 [myid:] - INFO [main:ClientCnxn@1653] - zookeeper.request.timeout value is 0. feature enabled= Welcome to ZooKeeper! 2019-05-24 11:25:13,891 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1112] - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error) JLine support is enabled 2019-05-24 11:25:13,943 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@959] - Socket connection established, initiating session, client: /0:0:0:0:0:0:0:1:48680, server: localhost/0:0:0:0:0:0:0:1:2181 [zk: localhost:2181(CONNECTING) 0] 2019-05-24 11:25:13,969 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1394] - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x10015424cbe0000, negotiated timeout = 30000 WATCHER:: WatchedEvent state:SyncConnected type:None path:null
二、 Kafka安裝
1、下載Kafka,到以下網址下載最新版本kafka-2.2.0的安裝包
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.2.0/kafka_2.12-2.2.0.tgz
2、安裝kafka,將下載的kafka_2.12-2.2.0.tgz包上傳至Linux服務器/usr/local目錄下,解壓:
tar -xzf kafka_2.12-2.2.0.tgz
3、配置kafka,進入/usr/local/kafka_2.12-2.2.0/config目錄下,修改server.properties配置文件即可,本文使用默認配置文件,根據需要,可自行修改,默認端口為9092,發要修改,改成對應的服務IP及端口號即可:
listeners=PLAINTEXT://:9092
4、啟動kafka服務器,進入解壓後目錄/usr/local/kafka_2.12-2.2.0/bin,命令如下:
./kafka-server-start.sh ../config/server.properties
結果如下:
./kafka-topics.sh --list --bootstrap-server localhost:9092 writchie
執行成功,顯示如下:
[2019-05-24 12:34:38,396] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator) [2019-05-24 12:34:38,397] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager) [2019-05-24 12:34:38,398] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator) [2019-05-24 12:34:38,438] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread) [2019-05-24 12:34:38,446] INFO [SocketServer brokerId=0] Started data-plane processors for 1 acceptors (kafka.network.SocketServer) [2019-05-24 12:34:38,451] INFO Kafka version: 2.2.0 (org.apache.kafka.common.utils.AppInfoParser) [2019-05-24 12:34:38,451] INFO Kafka commitId: 05fcfde8f69b0349 (org.apache.kafka.common.utils.AppInfoParser) [2019-05-24 12:34:38,452] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
5、創建Topic:創建一個名叫writchie的topic,命令如下:
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic writchie
6、查看topic情況:
./kafka-topics.sh --list --bootstrap-server localhost:9092
7、創建發送者,即向以writchie為topic發送消息,命令如下:
./kafka-console-producer.sh --broker-list localhost:9092 --topic writchie >first kafka message from wRitchie >second kafka message from wRitchie
注意:若要退出發送消息,按 Ctrl+c鍵退出。
8、創建消費者,命令如下:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic writchie --from-beginning
執行完後,顯示如下:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic writchie --from-beginning first kafka message from wRitchie second kafka message from wRitchie
注意:若要退出接收消息,按 Ctrl+c鍵退出。
9、在Linux系統的兩個不同終端中運行創建發送者和接收者的命令,若在發送者終端發送完消息,則在消費者終端能實時顯示接收到的消息,如下圖所示:
發送者終端:
消費者終端:
10、設置多代理集群:到目前為止,一直在與一個代理競爭,但這並不好玩。對於Kafka來說,單個代理只是一個大小為1的集群,因此除了啟動一些代理實例之外沒有太多變化。但是為了感受它,將集群擴展到三個節點(仍然在本地機器上)。
1、為每個代理創建一個配置文件(在Windows上使用copy命令),以下操作在/usr/local/kafka_2.12-2.2.0目錄下進行,命令如下:
# cp config/server.properties config/server-1.properties # cp config/server.properties config/server-2.properties
2、編輯server-1.properties、server-2.properties文件,修改內容如下:
config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dirs=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dirs=/tmp/kafka-logs-2
具體參見下圖:
注:broker.id屬性是群集中每個節點的唯一且永久的名稱。必須覆蓋端口和日誌目錄,因為在同一臺機器上運行這些,並且希望讓所有代理嘗試在同一端口上註冊或覆蓋彼此的數據。
3、已經啟動了Zookeeper,並啟動了單個節點,因此只需要啟動兩個新節點,打開兩個新的Linux終端,定位到/usr/local/kafka_2.12-2.2.0目錄,運行命令如下:
bin/kafka-server-start.sh config/server-1.properties & bin/kafka-server-start.sh config/server-2.properties &
4、創建一個副本為3的新topic:writchie-replicated-topic,在/usr/local/kafka_2.12-2.2.0目錄,運行命令如下:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic writchie-replicated-topic
5、現在是一個集群,怎麼才能知道那些Broker在做什麼呢?運行“describe topics”命令來查看:
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic writchie-replicated-topic
執行完後如下圖:
以下是對輸出信息的解釋。第一行給出了所有分區的摘要,下面的每行都給出了一個分區的信息。因為只有一個分區,所以只有一行。
Ø “leader”是負責給定分區所有讀寫操作的節點。每個節點都是隨機選擇的部分分區的領導者。 Ø “replicas”是複製分區日誌的節點列表,不管這些節點是leader還是僅僅活著。 Ø “isr”是一組“同步”replicas,是replicas列表的子集,它活著並被指到leader。
注意:在示例中,節點1是該主題中唯一分區的領導者。可以在已創建的原始主題上運行相同的命令來查看它的位置,命令如下:
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic writchie
執行完後,如下圖所示:
這沒什麼大不了,原來的主題沒有副本且在服務器0上。創建集群時,這是唯一的服務器。
6、下面發表一些信息給新topic:
./kafka-console-producer.sh --broker-list localhost:9092 --topic writchie-replicated-topic
執行完如下圖:
7、消費這些新消息:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic writchie-replicated-topic
執行完如下圖:
8、測試一下容錯性。 Broker 1 現在是 leader,讓我們來殺了它:
ps aux | grep server-1.properties
Linux終端中使用kill -9 進程ID,或者在啟動server-1.properties的終端按Ctrl+c停止節點1。
領導權已經切換到一個從屬節點,而且節點1也不在同步副本集中了:
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic writchie-replicated-topic
不過,即便原先寫入消息的leader已經不在,這些消息仍可用於消費:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic writchie-replicated-topic
執行結果如下圖:
至此,kafka基礎入門結束。
附:
問題一: kafka報錯,提示Error: Could not find or load main class kafka.Kafka
解決:下載的文件的是src-tgz文件,改下載二進制tgz文件,參見下圖