輕鬆上手 Spring Boot & Kafka 實戰!


輕鬆上手 Spring Boot & Kafka 實戰!


Kafka集群安裝、配置和啟動

Kafka需要依賴zookeeper,並且自身集成了zookeeper,zookeeper至少需要3個節點保證集群高可用,下面是在單機linux下創建kafka3個節點偽集群模式。

1、下載包

下載地址:http://kafka.apache.org/downloads

2、解壓包

tar -zxvf kafka_2.11-1.0.0.tgz\mv kafka_2.11-1.0.0 kafka1\mv kafka_2.11-1.0.0 kafka2\mv kafka_2.11-1.0.0 kafka3

3、創建ZK集群

修改ZK配置文件:kafka1-3/config/zookeeper.properties分別修改對應的參數。

<code>

dataDir

=/usr/local/kafka/zookeeper1

dataLogDir

=/usr/local/kafka/zookeeper/log

clientPort

=

2181

maxClientCnxns

=

0

tickTime

=

2000

initLimit

=

100

syncLimit

=

5

server.1

=

127.0

.

0.1

:

2888

:

3888

server.2

=

127.0

.

0.1

:

4888

:

5888

server.3

=

127.0

.

0.1

:

6888

:

7888

/<code>

/usr/local/kafka/zookeeper1-3目錄下分別創建myid文件,內容對應1~3

啟動ZK,分別進行Kafka1-3目錄:

bin/zookeeper-server-start.sh config/zookeeper.properties &

啟動報文件失敗,需要手動創建文件目錄並賦予對應的權限。

4、創建Kafka集群

配置文件:kafka1-3/config/server.properties分別修改對應的參數。

<code>

broker.id

=

1

 

zookeeper.connect

=localhost:

2181

,localhost:

2182

,localhost:

2183

listeners

=PLAINTEXT://

192.168

.

12.11

:

9091

 

log.dirs

=/tmp/kafka-logs-

1

/<code>

啟動Kafka,分別進行Kafka1-3目錄:

bin/kafka-server-start.sh config/server.properties &

啟動報文件失敗,需要手動創建文件目錄並賦予對應的權限。

5、集群測試

在kafka1上面發送消息:

bin/kafka-console-producer.sh --broker-list localhost:9091 --topic test

在kafka2、kafka3消費消息:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

Spring Boot 集成 Kafka 實戰

1、添加spring-kafka依賴

<code>

<

spring-kafka.version

>

2.1.0.RELEASE

spring-kafka.version

>

<

dependency

>

    

<

groupId

>

org.springframework.kafka

groupId

>

    

<

artifactId

>

spring-kafka

artifactId

>

    

<

version

>

${spring-kafka.version}

version

>

dependency

>

/<code>

2、添加Spring Boot的自動配置

自動配置類:

org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration

配置屬性類:

org.springframework.boot.autoconfigure.kafka.KafkaProperties

<code>

Spring

:   

kafka

:     

bootstrap-servers

:       

-

 192

.168

.101

.137

:9091

      

-

 192

.168

.101

.137

:9092

      

-

 192

.168

.101

.137

:9093

    

producer

:       

retries

: 0       

batch-size

: 16384       

buffer-memory

: 33554432       

key-serializer

org

.apache

.kafka

.common

.serialization

.StringSerializer

      

value-serializer

org

.apache

.kafka

.common

.serialization

.StringSerializer

    

consumer

:       

group-id

foo

      

auto-offset-reset

earliest

      

enable-auto-commit

true

      

auto-commit-interval

: 100       

key-deserializer

org

.apache

.kafka

.common

.serialization

.StringDeserializer

      

value-deserializer

org

.apache

.kafka

.common

.serialization

.StringDeserializer

/<code>

3、發送消息

<code> 

private

 KafkaTemplate kafkaTemplate;

public

 Object send(String msg) {     kafkaTemplate.send(

"test"

"name"

, msg);     

return

 

"send ok"

; }/<code>

4、接收消息

在任何bean裡面,添加@KafkaListener,支持消息接收。

<code>

@KafkaListener

(topics = 

"test"

) public void processMessage(String content) {     

logger

.info

("收到消息, 

topic

:test

msg

:{}", 

content

); }/<code>

5、參考資料

Spring Boot & Kafka官方文檔:

https://docs.spring.io/spring-boot/docs/current-SNAPSHOT/reference/htmlsingle/#boot-features-kafka

Spring for Apache Kafka官方文檔:


分享到:


相關文章: