Kafka 使用


前面我們講解了RabbitMQ,也瞭解了RabbitMQ的工作模式,下面我們來看一下Kafka 是如何進行消息發送的,Kafka 號稱性能怪獸,吞吐量非常高,當然kafka 設計之初就是為了解決高吞吐量的問題。Kafka 也一直被用與大數據、海量日誌的處理等問題。

1.新建項目

新建項目模塊fw-cloud-mq-kafka用來測試Kafka 的生產和消息,本文並不會測試Kafka 的海量數據,僅僅為了Kafka的使用。

Kafka 使用

2.maven 配置

配置中我們主要引入了spring-kafka,這是Spring 為Kafka 提供的工具包

<code><dependencies>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-web/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.kafka/<groupid>
<artifactid>spring-kafka/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-test/<artifactid>
/<dependency>
/<dependencies>
/<code>

3.新建啟動類

本項目並沒有加入到微服務中,如果有需要,讀者可以自行加入,當然如果加入了可以從RESTFUL接口接收數據,再推送到對應的Topic 中。

<code>@SpringBootApplication
public class FwKafkaMqApplication {
public static void main(String[] args) {
SpringApplication.run(FwKafkaMqApplication.class, args);
}
}
/<code>

4.應用配置

<code>server:
port: 8781
spring:
application:
name: fw-cloud-mq-kafka
kafka:

bootstrap-servers: localhost:9092
producer:
acks: 1
retries: 0
batch-size: 16384
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer

consumer:
group-id: testGroup
auto-offset-reset: earliest
enable-auto-commit: true
auto-commit-interval: 100
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer


/<code>

kafka 每個配置對應的意思如下:

  • kafka.bootstrap-servers
    指定kafka server的地址,集群配多個,中間,逗號隔開
  • kafka.producer.asks
    可以設置的值為:all, -1, 0, 1
    procedure要求leader在考慮完成請求之前收到的確認數,用於控制發送記錄在服務端的持久化,其值可以為如下:
    acks = 0 如果設置為零,則生產者將不會等待來自服務器的任何確認,該記錄將立即添加到套接字緩衝區並視為已發送。在這種情況下,無法保證服務器已收到記錄,並且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設置為-1。

    acks = 1 這意味著leader會將記錄寫入其本地日誌,但無需等待所有副本服務器的完全確認即可做出回應,在這種情況下,如果leader在確認記錄後立即失敗,但在將數據複製到所有的副本服務器之前,則記錄將會丟失。
    acks = all 這意味著leader將等待完整的同步副本集以確認記錄,這保證了只要至少一個同步副本服務器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當於acks = -1的設置。
  • kafka.producer.retries
    寫入失敗時,重試次數。當leader節點失效,一個副本節點會替代成為leader節點,此時可能出現寫入失敗。
    當retris為0時,produce不會重複。retirs重發,此時repli節點完全成為leader節點,不會產生消息丟失。
  • kafka.producer.batch-size
    每次批量發送消息的數量,produce積累到一定數據,一次發送
  • kafka.producer.key-serializer
    指定消息key編解碼方式
  • kafka.producer.value-serializer
    指定消息體的編解碼方式
  • kafka.consumer.group-id
    指定默認消費者group id --> 由於在kafka中,同一組中的consumer不會讀取到同一個消息,依靠groud.id設置組名
  • kafka.consumer.auto-offset-reset

    smallest和largest才有效,如果smallest重新0開始讀取,如果是largest從logfile的offset讀取。一般情況下我們都是設置smallest
  • kafka.consumer.enable-auto-commit
    設置自動提交offset
  • kafka.consumer.auto-commit-interval
    如果'enable.auto.commit'為true,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),默認值為5000。
  • kafka.consumer.key-deserializer
    指定消息key編解碼方式
  • kafka.consumer.value-deserializer
    指定消息體編解碼方式
    當然kafka 還有很多配置,我們簡單使用這些已經夠了

5.新建發送方

發送方需要引入KafkaTemplate,用來發送消息,發送的時候我們指定消息的topic 和內容。並把消息內容打印出來。並把發送類註冊為組件。

<code>/**
* @author xuyisu
* @description 發送方
* @date 2019/12/18
*/
@Component
@Slf4j
public class FwSender {

@Autowired
private KafkaTemplate<string> kafkaTemplate;

public boolean send(){

String message="Hello World:"+ DateUtil.now();
log.info("FwSender:"+message);
//第一個參數是topic,第二個參數是內容
kafkaTemplate.send("fwcloud",message);
return true;
}
}
/<string>/<code>

6.新建消費方

消費放需要添加一個公共方法並設置@KafkaListener和需要監聽的Topic,就可以實現消息的監聽並消費了。

<code>/**
* @author xuyisu
* @description 接收方
* @date 2019/12/18
*/
@Component
@Slf4j
public class FwReceiver {

@KafkaListener(topics = "fwcloud")
public void onMessage(String message){
log.info(message);
}
}
/<code>

7.啟動項目

啟動項目前,需要先啟動kafka
然後啟動單元測試FwSenderTest發送消息
通過控制檯我們看推送的消息

<code>2020-01-12 15:01:34 INFO  main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34

2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
2020-01-12 15:01:34 INFO main com.yisu.mq.kafka.sender.FwSender FwSender:Hello World:2020-01-12 15:01:34
/<code>


然後我們再看一下消費的信息

<code>2020-01-12 15:01:35 INFO  org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
2020-01-12 15:01:35 INFO org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 com.yisu.mq.kafka.consumer.FwReceiver Hello World:2020-01-12 15:01:34
/<code>


關於微服務相關的學習,作者寫了一本很詳細的教程,可以點擊下方鏈接,教程裡面包含詳細的代碼地址,歡迎star,fork


分享到:


相關文章: