高性能消息中間件-Kafka詳解

高性能消息中間件-Kafka詳解

簡介

Kafka是最初由Linkedin公司開發,是一個分佈式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分佈式消息系統,它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,消息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會併成為頂級開源 項目。

Kafka的使用場景

  • 日誌收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
  • 消息系統:解耦和生產者和消費者、緩存消息等。
  • 用戶活動跟蹤:Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這
  • 些活動信息被各個服務器發佈到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。
  • 運營指標:Kafka也經常用來記錄運營監控數據。包括收集各種分佈式應用的數據,生產各種操作的集中反饋,比如報警和報告。
高性能消息中間件-Kafka詳解

Kafka基本概念

kafka是一個分佈式的,分區的消息(官方稱之為commit log)服務。它提供一個消息系統應該具備的功能,但是確有著獨特的設計。可以這樣來說,Kafka借鑑了JMS規範的思想,但是確並沒有完全遵循JMS規範。

首先,讓我們來看一下基礎的消息(Message)相關術語:


高性能消息中間件-Kafka詳解

從一個較高的層面上來看,producer通過網絡發送消息到Kafka集群,然後consumer來進行消費,如下圖:

高性能消息中間件-Kafka詳解

主題Topic和消息日誌Log

讓我們首先深入理解Kafka提出一個高層次的抽象概念-Topic。

可以理解Topic是一個類別的名稱,同類消息發送到同一個Topic下面。對於每一個Topic,下面可以有多個分區

(Partition)日誌文件:

高性能消息中間件-Kafka詳解


Partition是一個有序的message序列,這些message按順序添加到一個叫做commit log的文件中。每個partition中的消息都有一個唯一的編號,稱之為offset,用來唯一標示某個分區中的message。

注意:每個partition,都對應一個commit log文件。一個partition中的message的offset都是唯一的,但是不同的partition中的message的offset可能是相同的。

可以這麼來理解Topic,Partition和Broker

一個topic,代表邏輯上的一個業務數據集,比如按數據庫裡不同表的數據操作消息區分放入不同topic,訂單相關操作消息放入訂單topic,用戶相關操作消息放入用戶topic,對於大型網站來說,後端數據都是海量的,訂單消息很可能是非常巨量的,比如有幾百個G甚至達到TB級別,如果把這麼多數據都放在一臺機器上可定會有容量限制問題,那麼就可以在topic內部劃分多個partition來分片存儲數據,不同的partition可以位於不同的機器上,每臺機器上都運行一個Kafka的進程Broker。

kafka集群,在配置的時間範圍內,維護所有的由producer生成的消息,而不管這些消息有沒有被消費。例如日誌保留(log retention )時間被設置為2天。kafka會維護最近2天生產的所有消息,而2天前的消息會被丟棄。kafka的性能與保留的數據量的大小沒有關係,因此保存大量的數據(日誌信息)不會有什麼影響。

每個consumer是基於自己在commit log中的消費進度(offset)來進行工作的。在kafka中,消費offset由consumer自己來維護;一般情況下我們按照順序逐條消費commit log中的消息,當然我可以通過指定offset來重複消費某些消息,或者跳過某些消息。


這意味kafka中的consumer對集群的影響是非常小的,添加一個或者減少一個consumer,對於集群或者其他consumer來說,都是沒有影響的,因為每個consumer維護各自的offset。所以說kafka集群是無狀態的,性能不會因為consumer數量受太多影響。kafka還將很多關鍵信息記錄在zookeeper裡,保證自己的無狀態,從而在水平擴容時非常方便。

為什麼要對Topic下數據進行分區存儲?

1、commit log文件會受到所在機器的文件系統大小的限制,分區之後,理論上一個topic可以處理任意數量的數據。

2、為了提高並行度。

分佈式Distribution

log的partitions分佈在kafka集群中不同的broker上,每個broker可以請求備份其他broker上partition上的數據。kafka集群支持配置一個partition備份的數量。

針對每個partition,都有一個broker起到“leader”的作用,0個或多個其他的broker作為“follwers”的作用。leader處理所有的針對這個partition的讀寫請求,而followers被動複製leader的結果。如果這個leader失效了,其中的一個follower將會自動的變成新的leader。

Producers

生產者將消息發送到topic中去,同時負責選擇將message發送到topic的哪一個partition中。通過round­robin做簡單的

負載均衡。也可以根據消息中的某一個關鍵字來進行區分。通常第二種方式使用的更多。

Consumers

傳統的消息傳遞模式有2種:隊列( queue) 和(publish-subscribe)

  • queue模式:多個consumer從服務器中讀取數據,消息只會到達一個consumer。
  • publish-subscribe模式:消息會被廣播給所有的consumer。

Kafka基於這2種模式提供了一種consumer的抽象概念:consumer group。

  • queue模式:所有的consumer都位於同一個consumer group 下。
  • publish-subscribe模式:所有的consumer都有著自己唯一的consumer group。
高性能消息中間件-Kafka詳解

上圖說明:由2個broker組成的kafka集群,總共有4個partition(P0-P3)。這個集群由2個Consumer Group, A有2個consumer instances ,B有四個。通常一個topic會有幾個consumer group,每個consumer group都是一個邏輯上的訂閱者( logicalsubscriber )。每個consumer group由多個consumer instance組成,從而達到可擴展和容災的功能。

消費順序

Kafka比傳統的消息系統有著更強的順序保證。

一個partition同一個時刻在一個consumer group中只有一個consumer instance在消費,從而保證順序。consumer group中的consumer instance的數量不能比一個Topic中的partition的數量多,否則,多出來的consumer消費不到消息。Kafka只在partition的範圍內保證消息消費的局部順序性,不能在同一個topic中的多個partition中保證總的消費順序性。如果有在總體上保證消費順序的需求,那麼我們可以通過將topic的partition數量設置為1,將consumer group中的consumer instance數量也設置為1。

從較高的層面上來說的話,Kafka提供了以下的保證:

發送到一個Topic中的message會按照發送的順序添加到commit log中。意思是,如果消息 M1,M2由同一個producer發送,M1比M2發送的早的話,那麼在commit log中,M1的offset就會比commit 2的offset小。一個consumer在commit log中可以按照發送順序來消費message。如果一個topic的備份因子設置為N,那麼Kafka可以容忍N-1個服務器的失敗,而存儲在commit log中的消息不會丟失。

kafka集群搭建與使用

安裝前的環境準備

由於Kafka是用Scala語言開發的,運行在JVM上,因此在安裝Kafka之前需要先安裝JDK。

<code> yum install java‐1.8.0‐openjdk* ‐y/<code>

kafka依賴zookeeper,所以需要先安裝zookeeper

<code>wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper‐3.4.12.tar.gz
tar ‐zxvf zookeeper‐3.4.12.tar.gz
cd zookeeper‐3.4.12
cp conf/zoo_sample.cfg conf/zoo.cfg
# 啟動zookeeper
bin/zkServer.sh start
bin/zkCli.sh
#查看zk的根目錄相關節點
ls / 
/<code>

第一步:下載安裝包

下載2.2.0 release版本,並解壓:

<code>wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11‐1.1.0.tgz
tar ‐xzf kafka_2.11‐1.1.0.tgz
cd kafka_2.11‐1.1.0/<code>

第二步:啟動服務

現在來啟動kafka服務:

啟動腳本語法:kafka­server­start.sh [­daemon] server.properties

可以看到,server.properties的配置路徑是一個強制的參數,­daemon表示以後臺進程運行,否則ssh客戶端退出後,就會停止服務。(注意,在啟動kafka時會使用linux主機名關聯的ip地址,所以需要把主機名和linux的ip映射配置到本地host裡,用vim /etc/hosts)

<code>bin/kafka‐server‐start.sh ‐daemon config/server.properties
# 我們進入zookeeper目錄通過zookeeper客戶端查看下zookeeper的目錄樹
bin/zkCli.sh
#查看zk的根目錄kafka相關節點
ls / 
#查看kafka節點
ls /brokers/ids /<code>

server.properties核心配置詳解:(詳細請參考官方文檔)

高性能消息中間件-Kafka詳解

第三步:創建主題

現在我們來創建一個名字為“test”的Topic,這個topic只有一個partition,並且備份因子也設置為1:

<code> bin/kafka‐topics.sh ‐‐create ‐‐zookeeper 192.168.1.1:2181 ‐‐replication‐factor 1 ‐‐partitions 1 ‐‐topic test/<code>

現在我們可以通過以下命令來查看kafka中目前存在的topic

<code> bin/kafka‐topics.sh ‐‐list ‐‐zookeeper 192.168..0.60:2181/<code>

除了我們通過手工的方式創建Topic,當producer發佈一個消息某個指定的Topic,但是這個Topic並不存在時,就自動創建。

刪除主題

<code>bin/kafka‐topics.sh ‐‐delete ‐‐topic test ‐‐zookeeper 192.168.00:2181/<code>

第四步:發送消息

kafka自帶了一個producer命令客戶端,可以從本地文件中讀取內容,或者我們也可以以命令行中直接輸入內容,並將這些內容以消息的形式發送到kafka集群中。在默認情況下,每一個行會被當做成一個獨立的消息。首先我們要運行發佈消息的腳本,然後在命令中輸入要發送的消息的內容:

<code>bin/kafka‐console‐producer.sh ‐‐broker‐list 192.168.1.1:9092 ‐‐topic test
>this is a msg
>this is a another msg/<code>

第五步:消費消息

對於consumer,kafka同樣也攜帶了一個命令行客戶端,會將獲取到內容在命令中進行輸出,默認是消費最新的消息:

<code>bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server 192.168.1.1:9092 ‐‐consumer‐property group.id=testGroup ‐‐topic test/<code>

如果想要消費之前的消息可以通過--from-beginning參數指定,如下命令:

<code>bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server 192.168.1.1:9092 ‐‐from‐beginning ‐‐topic test/<code>

如果你是通過不同的終端窗口來運行以上的命令,你將會看到在producer終端輸入的內容,很快就會在consumer的終端窗口上顯示出來。

以上所有的命令都有一些附加的選項;當我們不攜帶任何參數運行命令的時候,將會顯示出這個命令的詳細用法。還有一些其他命令如下:

查看組名

<code>bin/kafka‐consumer‐groups.sh ‐‐bootstrap‐server 192.168.1.1:9092 ‐‐list/<code>

查看消費者的消費偏移量

<code>bin/kafka‐consumer‐groups.sh ‐‐bootstrap‐server 192.168.1.1:9092 ‐‐describe ‐‐group testGroup/<code>
高性能消息中間件-Kafka詳解

注意看current-offset 和 log-end-offset還有 lag ,分別為當前消費偏移量,結束的偏移量(HW),落後消費的偏移量

消費多主題

<code>bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server 192.168.1.1:9092 ‐‐whitelist "test|test‐2"/<code>

單播消費

一條消息只能被某一個消費者消費的模式,類似queue模式,只需讓所有消費者在同一個消費組裡即可分別在兩個客戶端執行如下消費命令,然後往主題裡發送消息,結果只有一個客戶端能收到消息

<code>bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server 192.16810.60:9092 ‐‐consumer‐property group.id=testGroup ‐‐topic test/<code>

多播消費

一條消息能被多個消費者消費的模式,類似publish-subscribe模式費,針對Kafka同一條消息只能被同一個消費組下的某一個消費者消費的特性,要實現多播只要保證這些消費者屬於不同的消費組即可。我們再增加一個消費者,該消費者屬於testGroup-2消費組,結果兩個客戶端都能收到消息

<code>bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server 192.168.1.1:9092 ‐‐consumer‐property group.id=testGroup‐2 ‐‐topic test/<code>

第六步:kafka集群配置

到目前為止,我們都是在一個單節點上運行broker,這並沒有什麼意思。對於kafka來說,一個單獨的broker意味著kafka集群中只有一個節點。要想增加kafka集群中的節點數量,只需要多啟動幾個broker實例即可。為了有更好的理解,現在我們在一臺機器上同時啟動三個broker實例。首先,我們需要建立好其他2個broker的配置文件:

<code>cp config/server.properties config/server‐1.properties
cp config/server.properties config/server‐2.properties/<code>

配置文件的內容分別如下:

config/server-1.properties:

<code>#broker.id屬性在kafka集群中必須要是唯一
broker.id=1
#kafka部署的機器ip和提供服務的端口號
listeners=PLAINTEXT://192.168.1.1:9093
log.dir=/usr/local/data/kafka‐logs‐1/<code>

config/server-2.properties:

<code>broker.id=2
listeners=PLAINTEXT://192.168.1.1:9094
log.dir=/usr/local/data/kafka‐logs‐2/<code>

目前我們已經有一個zookeeper實例和一個broker實例在運行了,現在我們只需要在啟動2個broker實例即可:

<code>bin/kafka‐server‐start.sh ‐daemon config/server‐1.properties
bin/kafka‐server‐start.sh ‐daemon config/server‐2.properties/<code>

現在我們創建一個新的topic,副本數設置為3,分區數設置為2:

<code>bin/kafka‐topics.sh ‐‐create ‐‐zookeeper 192.168.1.1:2181 ‐‐replication‐factor 3 ‐‐partitions ‐‐topic my‐replicated‐topic/<code>

查看下topic的情況


<code>bin/kafka‐topics.sh ‐‐describe ‐‐zookeeper 192.168.1.1:2181 ‐‐topic my‐replicated‐topic/<code>
高性能消息中間件-Kafka詳解

以下是輸出內容的解釋,第一行是所有分區的概要信息,之後的每一行表示每一個partition的信息。

  • leader節點負責給定partition的所有讀寫請求。
  • replicas 表示某個partition在哪幾個broker上存在備份。不管這個幾點是不是”leader“,甚至這個節點掛了,也會列出。
  • isr 是replicas的一個子集,它只列出當前還存活著的,並且已同步備份了該partition的節點。

我們可以運行相同的命令查看之前創建的名稱為”test“的topic

<code>bin/kafka‐topics.sh ‐‐describe ‐‐zookeeper 192.168.1.1:2181 ‐‐topic test/<code>
高性能消息中間件-Kafka詳解

之前設置了topic的partition數量為1,備份因子為1,因此顯示就如上所示了。當然我們也可以通過如下命令增加topic的分區數量(目前kafka不支持減少分區):

<code>bin/kafka‐topics.sh ‐alter ‐‐partitions 3 ‐‐zookeeper 127.0.0.1:2181 ‐‐topic test/<code>

現在我們向新建的 my-replicated-topic 中發送一些message,kafka集群可以加上所有kafka節點:

<code>bin/kafka‐console‐producer.sh ‐‐broker‐list 192.168.1.1:9092,192.168.1.1:9093,192.168.1.1:9094 ‐‐topic my‐replicated‐topic
>my test msg 1
>my test msg 2/<code>

現在開始消費:

<code>bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server 192.168.1.1:9092 ‐‐from‐beginning ‐‐topic my‐replicated‐topic
my test msg 1
my test msg 2/<code>

現在我們來測試我們容錯性,因為broker1目前是my-replicated-topic的分區0的leader,所以我們要將其kill

<code>ps ‐ef | grep server.properties
kill ‐9 132323/<code>

現在再執行命令:

<code>bin/kafka‐topics.sh ‐‐describe ‐‐zookeeper 192.168.1.1:9092 ‐‐topic my‐replicated‐topic/<code>

我們可以看到,分區0的leader節點已經變成了broker 0。要注意的是,在Isr中,已經沒有了1號節點。leader的選舉也是從ISR(in-sync replica)中進行的。此時,我們依然可以 消費新消息:

<code>bin/kafka‐console‐consumer.sh ‐‐bootstrap‐server 192.168.1.1:9092 ‐‐from‐beginning ‐‐topic my‐replicated‐topic
my test msg 1
my test msg 2/<code>

查看主題分區對應的leader信息:

高性能消息中間件-Kafka詳解

Java客戶端訪問Kafka

引入maven依賴

<code>
org.apache.kafka
kafka‐clients
1.1.0
/<code>

消息發送端代碼

高性能消息中間件-Kafka詳解

高性能消息中間件-Kafka詳解

消息接收端代碼

高性能消息中間件-Kafka詳解

高性能消息中間件-Kafka詳解

高性能消息中間件-Kafka詳解

Spring Boot整合Kafka

引入spring boot kafka依賴,詳見項目實例:spring-boot-kafka

<code>
org.springframework.kafka
spring‐kafka
/<code>

application.yml配置如下:

server:

port: 8080

spring:

kafka:

bootstrap‐servers: 192.168.0.60:9092,192.168.0.60:9093

# 生產者

producer:

# 設置大於0的值,則客戶端會將發送失敗的記錄重新發送

retries: 3

batch‐size: 16384

buffer‐memory: 33554432

# 指定消息key和消息體的編解碼方式

key‐serializer: org.apache.kafka.common.serialization.StringSerializer

value‐serializer: org.apache.kafka.common.serialization.StringSerializer

consumer:

group‐id: mygroup

enable‐auto‐commit: true

發送者代碼:

<code>@RestController 
public class KafkaController {
 @Autowired 
private KafkaTemplate kafkaTemplate; 
 @RequestMapping("/send") 
 public void send() { 
 kafkaTemplate.send("mytopic", 0, "key", "this is a msg"); 
} 
} 
/<code>

消費者代碼:

<code>@Component 
 public class MyConsumer { 
 @KafkaListener(topics = "mytopic",groupId = "testGroup") 
 public void listen(ConsumerRecord record) { 
 String value = record.value(); 
 System.out.println(value); 
 System.out.println(record); 
 } 
 }
/<code>

本文為原創內容,未經允許不得轉載。


分享到:


相關文章: