這分析絕了,代碼實戰演示Kafka消費者分組(Consumer Group)策略

一、原理介紹

每一個consumer實例都屬於一個consumer group,每一條消息只會被同一個consumer group裡的一個consumer實例消費。(不同consumer group可以同時消費同一條消息)

Kafka保證的是穩定狀態下每一個consumer實例只會消費某一個或多個特定partition的數據,而某個partition的數據只會被某一個特定的consumer實例所消費。這樣設計的劣勢是無法讓同一個consumer group裡的consumer均勻消費數據,優勢是每個consumer不用都跟大量的broker通信,減少通信開銷,同時也降低了分配難度,實現也更簡單。另外,因為同一個partition裡的數據是有序的,這種設計可以保證每個partition裡的數據也是有序被消費。

這分析絕了,代碼實戰演示Kafka消費者分組(Consumer Group)策略

二、SpringBoot集成kafka

代碼下載地址:https://gitee.com/jikeh/JiKeHCN-RELEASE.git

項目名:spring-boot-kafka-group-consumer

1、添加依賴

1)spring-kafka與kafka的版本選擇問題

官網介紹:https://spring.io/projects/spring-kafka

這分析絕了,代碼實戰演示Kafka消費者分組(Consumer Group)策略

This matrix is client compatibility; in most cases (since 0.10.2.0) newer clients can communicate with older brokers. All users with brokers >= 0.10.x.x are recommended to use spring-kafka version 1.3.x or higher

版本選擇:

SpringBoot:1.5.16

Kafka:1.0.2

SpringKafka:1.3.2

2、配置kafka連接屬性

spring.kafka.bootstrap-servers=192.168.0.108:9093,192.168.0.108:9094,192.168.0.108:9095
## 當兩個消費者使用不同的分組時:都會收到消息,類似於"訂閱"功能!!!
## 當使用相同的分組時,當某一個消費者不消費時,另外一個消費者開始消費
spring.kafka.consumer.group-id=myGroup2
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3、生產者

@Component
public class Sender {

@Autowired
private KafkaTemplate kafkaTemplate;

public void sendMessage(){
Message m = new Message();
m.setId(System.currentTimeMillis());
m.setMsg(UUID.randomUUID().toString());
m.setSendTime(new Date());
kafkaTemplate.send("jikeh", JSONObject.toJSONString(m));
}
}

4、構建兩個消費者

@Component
public class Receiver {

@KafkaListener(topics = "jikeh")
public void processMessage(String content) {
Message m = JSONObject.parseObject(content, Message.class);
System.out.println("接受信息:"+m.getMsg());
}
}

三、場景測試

  • 啟動單節點單broker環境

bin/kafka-server-start.sh config/server.properties

  • 創建一個測試topic:jikeh_consumer_group

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic jikeh_consumer_group

  • 啟動一個生產者,啟動兩個消費者同屬一個消費者組

測試結果:只有一個消費者能夠收到消息

  • 啟動一個生產者,啟動兩個消費者分別屬於兩個不同的消費者組

測試結果:兩個消費者都能收到消息

如果理解上有困難,環境參考視頻教程:


分享到:


相關文章: