Kafka集群安裝、配置和啟動
Kafka需要依賴zookeeper,並且自身集成了zookeeper,zookeeper至少需要3個節點保證集群高可用,下面是在單機linux下創建kafka3個節點偽集群模式。
1、下載包
下載地址:http://kafka.apache.org/downloads
2、解壓包
tar -zxvf kafka_2.11-1.0.0.tgz\mv kafka_2.11-1.0.0 kafka1\mv kafka_2.11-1.0.0 kafka2\mv kafka_2.11-1.0.0 kafka3
3、創建ZK集群
修改ZK配置文件:kafka1-3/config/zookeeper.properties分別修改對應的參數。
<code>dataDir
=/usr/local/kafka/zookeeper1dataLogDir
=/usr/local/kafka/zookeeper/logclientPort
=2181
maxClientCnxns
=0
tickTime
=2000
initLimit
=100
syncLimit
=5
server.1
=127.0
.0.1
:2888
:3888
server.2
=127.0
.0.1
:4888
:5888
server.3
=127.0
.0.1
:6888
:7888
/<code>
/usr/local/kafka/zookeeper1-3目錄下分別創建myid文件,內容對應1~3
啟動ZK,分別進行Kafka1-3目錄:
bin/zookeeper-server-start.sh config/zookeeper.properties &
啟動報文件失敗,需要手動創建文件目錄並賦予對應的權限。
4、創建Kafka集群
配置文件:kafka1-3/config/server.properties分別修改對應的參數。
<code>broker.id
=1
zookeeper.connect
=localhost:2181
,localhost:2182
,localhost:2183
listeners
=PLAINTEXT://192.168
.12.11
:9091
log.dirs
=/tmp/kafka-logs-1
/<code>
啟動Kafka,分別進行Kafka1-3目錄:
bin/kafka-server-start.sh config/server.properties &
啟動報文件失敗,需要手動創建文件目錄並賦予對應的權限。
5、集群測試
在kafka1上面發送消息:
bin/kafka-console-producer.sh --broker-list localhost:9091 --topic test
在kafka2、kafka3消費消息:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
Spring Boot 集成 Kafka 實戰
1、添加spring-kafka依賴
<code><
spring-kafka.version
>2.1.0.RELEASEspring-kafka.version
><
dependency
><
groupId
>org.springframework.kafkagroupId
><
artifactId
>spring-kafkaartifactId
><
version
>${spring-kafka.version}version
>dependency
>/<code>
2、添加Spring Boot的自動配置
自動配置類:
org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
配置屬性類:
org.springframework.boot.autoconfigure.kafka.KafkaProperties
<code>Spring
:kafka
:bootstrap-servers
:-
192.168
.101
.137
:9091
-
192.168
.101
.137
:9092
-
192.168
.101
.137
:9093
producer
:retries
: 0batch-size
: 16384buffer-memory
: 33554432key-serializer
:org
.apache
.kafka
.common
.serialization
.StringSerializer
value-serializer
:org
.apache
.kafka
.common
.serialization
.StringSerializer
consumer
:group-id
:foo
auto-offset-reset
:earliest
enable-auto-commit
:true
auto-commit-interval
: 100key-deserializer
:org
.apache
.kafka
.common
.serialization
.StringDeserializer
value-deserializer
:org
.apache
.kafka
.common
.serialization
.StringDeserializer
/<code>
3、發送消息
<code>private
KafkaTemplate kafkaTemplate;public
Object send(String msg) { kafkaTemplate.send("test"
,"name"
, msg);return
"send ok"
; }/<code>
4、接收消息
在任何bean裡面,添加@KafkaListener,支持消息接收。
<code>@KafkaListener
(topics ="test"
) public void processMessage(String content) {logger
.info
("收到消息,topic
:test
,msg
:{}",content
); }/<code>
5、參考資料
Spring Boot & Kafka官方文檔:
https://docs.spring.io/spring-boot/docs/current-SNAPSHOT/reference/htmlsingle/#boot-features-kafka
Spring for Apache Kafka官方文檔: