Spring Boot 2.x 如何快速整合消息中間件 Kafka?這麼方便的嗎?

分享阿里 P8 高級架構師吐血總結的 《Java 核心知識體系&面試資料.pdf》

據說是阿里 P8 級高級架構師吐血總結的一份 Java 核心知識.pdf, 內容覆蓋很廣,Java 核心基礎、Java 多線程、高併發、Spring、微服務、Netty 與 RPC、Zookeeper、Kafka、RabbitMQ、Habase、設計模式、負載均衡、分佈式緩存、Hadoop、Spark、Storm、雲計算等。

另外,附送 100G 學習、面試視頻文檔喲~

獲取方式:【關注 + 轉發】後,私信我,回覆關鍵字【資源】,即可免費無套路獲取哦~

以下是資源的部分目錄以及內容截圖:

Spring Boot 2.x 如何快速整合消息中間件 Kafka?這麼方便的嗎?

Spring Boot 2.x 如何快速整合消息中間件 Kafka?這麼方便的嗎?

Spring Boot 2.x 如何快速整合消息中間件 Kafka?這麼方便的嗎?

Spring Boot 2.x 如何快速整合消息中間件 Kafka?這麼方便的嗎?

Spring Boot 2.x 如何快速整合消息中間件 Kafka?這麼方便的嗎?

重要的事再說一遍,獲取方式:【關注 + 轉發】後,私信我,回覆關鍵字【資源】,即可免費無套路獲取哦~

正文開始

Spring Boot 2.x 如何快速整合消息中間件 Kafka?這麼方便的嗎?

目錄

  • 一、什麼是 Kafka ?
  • 二、為什麼要用 Kafka ?
  • 三、Kafka 環境安裝
  • 四、Spring Boot 2.x 整合 Kafka
  • 五、總結
  • 六、GitHub 源碼地址

什麼是 Kafka?

Kafka 是 Apache 基金會開源的一個分佈式發佈 - 訂閱消息中間件,流處理平臺。它起源於 LinkedIn,由 Scala 和 Java兩種語言編寫而成。於 2011 年成為 Apache 項目,2012 成為 Apache 基金會下頂級項目。

Kafka 專為分佈式高吞吐系統而設計。相比較其他消息中間件,如 RabbitMq 等,Kafka 具有更好的吞吐量,內置分區,複製和固有的容錯能力,使得它非常適合應用在大數據領域。另外,Kafka 還支持離線、在線消費消息。

為什麼要用 Kafka

  • 低延遲 - Kafka 支持低延遲消息傳遞,速度極快,能達到 200w 寫/秒;
  • 高性能 - Kafka對於消息的發佈、訂閱都具有高吞吐量。即使存儲了 TB 級的消息,依然能夠保證穩定的性能;
  • 可靠性 - Kafka 是分佈式,分區,複製和容錯的,保證零停機和零數據丟失。
  • 可拓展性 - Kafka 支持集群水平拓展。
  • 耐用性 - Kafka 使用"分佈式提交日誌",消息能夠快速的持久化的磁盤上。
Spring Boot 2.x 如何快速整合消息中間件 Kafka?這麼方便的嗎?

Kafka 環境安裝

接下來,小哈為大家演示一下,在 Linux 系統中,採用最簡單的單機安裝方式, 因為本文著重點還是介紹 Spring Boot 2.x 快速集成整合 Kafka.

下載 Kafka

訪問 Kafka 官網 http://kafka.apache.org/downloads,下載 tgz 包, 這裡演示版本為最新的 2.3.0 版本。

Spring Boot 2.x 如何快速整合消息中間件 Kafka?這麼方便的嗎?

解壓,進入目錄

下載下來過後,放置到指定位置,執行命令解壓:

tar -zxvf kafka_2.11-2.3.0.tgz 

解壓完成後,進入 Kafka 目錄下:

cd kafka_2.11-2.3.0

啟動 zookeeper

通過 bin 目錄下的 zookeeper-server-start.sh 啟動腳本,來啟動 zk 單節點實例:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

啟動 Kafka

通過 bin 目錄下的 kafka-server-start.sh 來啟動 :

bin/kafka-server-start.sh config/server.properties

注意:Kafka 默認使用 9092 端口,注意關閉防火牆,阿里雲服務器的話,記得添加安全組。

Spring Boot 2.x 開始整合

新建一個 Spring Boot 2.x Web 工程。

項目結構

Spring Boot 2.x 如何快速整合消息中間件 Kafka?這麼方便的嗎?

添加 maven 依賴

小哈這裡完整的 maven 依賴如下:


<project> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelversion>4.0.0/<modelversion>
<parent>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-parent/<artifactid>
<version>2.1.2.RELEASE/<version>
<relativepath>
/<parent>
<groupid>site.exception/<groupid>
<artifactid>spring-boot-kafka/<artifactid>
<version>0.0.1-SNAPSHOT/<version>
<name>spring-boot-kafka/<name>
<description>Demo project for Spring Boot/<description>
<properties>
<java.version>1.8/<java.version>
/<properties>
<dependencies>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-web/<artifactid>
/<dependency>

<dependency>
<groupid>org.springframework.kafka/<groupid>
<artifactid>spring-kafka/<artifactid>
/<dependency>
<dependency>
<groupid>org.springframework.kafka/<groupid>
<artifactid>spring-kafka-test/<artifactid>
<scope>test/<scope>
/<dependency>

<dependency>
<groupid>org.projectlombok/<groupid>
<artifactid>lombok/<artifactid>
<optional>true/<optional>

/<dependency>

<dependency>
<groupid>com.alibaba/<groupid>
<artifactid>fastjson/<artifactid>
<version>1.2.58/<version>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-devtools/<artifactid>
<scope>runtime/<scope>
<optional>true/<optional>
/<dependency>
<dependency>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-starter-test/<artifactid>
<scope>test/<scope>
/<dependency>
/<dependencies>
<build>
<plugins>
<plugin>
<groupid>org.springframework.boot/<groupid>
<artifactid>spring-boot-maven-plugin/<artifactid>
/<plugin>
/<plugins>
/<build>
/<project>

添加 kafka 配置

修改 application.yml 文件,添加 kafka 相關配置:

spring:
kafka:
# 指定 kafka 地址,我這裡在本地,直接就 localhost, 若外網地址,注意修改【PS: 可以指定多個】
bootstrap-servers: localhost:9092
consumer:
# 指定 group_id
group-id: group_id
auto-offset-reset: earliest

# 指定消息key和消息體的編解碼方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# 指定消息key和消息體的編解碼方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

關於 auto-offset-reset

auto.offset.reset 配置有3個值可以設置,分別如下:

  • earliest:當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset時,從頭開始消費;
  • latest:當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset 時,消費新產生的該分區下的數據;
  • none: topic各分區都存在已提交的 offset 時,從 offset 後開始消費;只要有一個分區不存在已提交的 offset,則拋出異常;

默認建議用 earliest, 設置該參數後 kafka出錯後重啟,找到未消費的offset可以繼續消費。

而 latest 這個設置容易丟失消息,假如 kafka 出現問題,還有數據往topic中寫,這個時候重啟kafka,這個設置會從最新的offset開始消費, 中間出問題的那些就不管了。

none 這個設置沒有用過,兼容性太差,經常出問題。

新增一個訂單類

模擬業務系統中,用戶每下一筆訂單,就發送一個消息,供其他服務消費:

/**
* @author 犬小哈
* @date 2019/4/12
* @time 下午3:05
* @discription 訂單實體類
**/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {
/**
* 訂單id
*/
private long orderId;
/**
* 訂單號
*/
private String orderNum;
/**
* 訂單創建時間
*/
private LocalDateTime createTime;
}

添加一個消息發佈者

新建一個 KafkaProvider 消息提供者類,源碼如下:

/**
* @author 犬小哈
* @date 2019/4/12
* @time 下午3:05
* @discription 消息提供者
**/
@Component
@Slf4j
public class KafkaProvider {
/**
* 消息 TOPIC
*/
private static final String TOPIC = "xiaoha";
@Autowired
private KafkaTemplate<string> kafkaTemplate;
public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
// 構建一個訂單類
Order order = Order.builder()
.orderId(orderId)
.orderNum(orderNum)
.createTime(createTime)
.build();
// 發送消息,訂單類的 json 作為消息體
ListenableFuture<sendresult>> future =
kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order));
// 監聽回調
future.addCallback(new ListenableFutureCallback<sendresult>>() {
@Override
public void onFailure(Throwable throwable) {
log.info("## Send message fail ...");
}
@Override
public void onSuccess(SendResult<string> result) {
log.info("## Send message success ...");
}
});
}
}

/<string>/<sendresult>/<sendresult>/<string>

添加一個消息消費者

消息發送出去了,當然就需要一個消費者,消費者拿到消息後,再做相關的業務處理,這裡,小哈僅僅是打印消息體。

添加 KafkaConsumer 消費者類:

/**
* @author 犬小哈
* @date 2019/4/12
* @time 下午3:05
* @discription 消息消費者
**/
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "xiaoha", groupId = "group_id")
public void consume(String message) {
log.info("## consume message: {}", message);
}
}

通過 @KafkaListener註解,我們可以指定需要監聽的 topic 以及 groupId, 注意,這裡的 topics 是個數組,意味著我們可以指定多個 topic,如:@KafkaListener(topics={"xiaoha","xiaoha2"},groupId="group_id")。

注意:消息發佈者的 TOPIC 需要保持與消費者監聽的 TOPIC 一致,否者消費不到消息。

單元測試

新建單元測試,功能測試消息發佈,以及消費。

/**
* @author 犬小哈
* @date 2019/4/12
* @time 下午3:05
* @discription
**/
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootKafkaApplicationTests {
@Autowired
private KafkaProvider kafkaProvider;
@Test
public void sendMessage() throws InterruptedException {
// 發送 1000 個消息
for (int i = 0; i < 1000; i++) {
long orderId = i+1;
String orderNum = UUID.randomUUID().toString();
kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
}
TimeUnit.MINUTES.sleep(1);
}
}

發送 1000 個消息,看消息是否能夠被正常發佈與消費,控制檯日誌如下:

Spring Boot 2.x 如何快速整合消息中間件 Kafka?這麼方便的嗎?

可以發現,1000 個消息被成功發送,且被正常消費。

我們再驗證下 Kafka 的 topic 列表,看 xiaoha 這個 topic 是否正常被創建, 執行 bin 目錄下查看 topic 列表的 kafka-topics.sh 腳本:

bin/kafka-topics.sh --list --zookeeper localhost:2181
Spring Boot 2.x 如何快速整合消息中間件 Kafka?這麼方便的嗎?

好了,大功告成!

總結

小哈今天主要和大家分享了,如何安裝單機版的 kafka 環境、如何在 Spring Boot 2.x 中快速集成消息中間件 Kafka,以及演示了相關示例代碼來發布消息、消費消息,希望大家看完過後有所收穫,下期見!


分享到:


相關文章: