簡介
Apache Kafka是由Apache軟件基金會開發的一個開源分佈式消息系統項目,由Scala寫成。該項目的目標是為處理實時數據提供一個統一、高通量、低等待的平臺。它是一種分佈式的,基於發佈/訂閱的消息系統,以可水平擴展和高吞吐率而被廣泛使用。目前越來越多的開源分佈式處理系統如Cloudera、Apache Storm、Spark都支持與Kafka集成,已被多家不同類型的公司作為多種類型的數據管道和消息系統使用。
為什麼要用kafka
活動流數據是幾乎所有站點在對其網站使用情況做報表時都要用到的數據中最常規的部分。活動數據包括頁面訪問量(Page View)、被查看內容方面的信息以及搜索情況等內容。這種數據通常的處理方式是先把各種活動以日誌的形式寫入某種文件,然後週期性地對這些文件進行統計分析。運營數據指的是服務器的性能數據(CPU、IO使用率、請求時間、服務日誌等等數據)。運營數據的統計方法種類繁多。
近年來,活動和運營數據處理已經成為了網站軟件產品特性中一個至關重要的組成部分,這就需要一套稍微更加複雜的基礎設施對其提供支持。
使用消息系統的好處是:
l 解耦
在項目啟動之初來預測將來項目會碰到什麼需求,是極其困難的。消息系統在處理過程中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
l 冗餘
有些情況下,處理數據的過程會失敗。除非數據被持久化,否則將造成丟失。消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
l 擴展性
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調節參數。擴展就像調大電力按鈕一樣簡單。
l 靈活性 & 峰值處理能力
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見;如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
l 可恢復性
系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復後被處理。
l 順序保證
在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。Kafka保證一個Partition內的消息的有序性。
l 緩衝
在任何重要的系統中,都會有需要不同的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列通過一個緩衝層來幫助任務最高效率的執行———寫入隊列的處理會盡可能的快速。該緩衝有助於控制和優化數據流經過系統的速度。
l 異步通信
很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然後在需要的時候再去處理它們。
基礎知識
Broker - 代理。Kafka集群包含一個或多個服務器,這種服務器被稱為broker;
Topic - 話題,有時Topic也被稱為seed(種子),特別是在接口裡。每條發佈到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處);
Partition - 分區。Parition是物理上的概念,每個Topic包含一個或多個Partition.;
Producer - 生產者。負責發佈消息到Kafka broker;
Consumer - 消費者。向Kafka broker讀取消息的客戶端;
Consumer Group - 消費者群組。每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group id,若不指定group id則屬於默認的group)。
Kafka拓撲結構
![Kafka學習指南](http://p2.ttnews.xyz/loading.gif)
Kafka拓撲結構
第一次親密接觸
1、下載及安裝
(1)下載:
> wget –q http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.9.1-0.8.2.1.tgz
(2) 解壓:
> tar -xzf kafka_2.9.1-0.8.2.1.tgz
(3) 進入kafka根目錄:
> cd kafka_2.9.1-0.8.2.1
2、啟動服務
(a)啟動zookeeper
> bin/zookeeper-server-start.sh config/zookeeper.properties
(b)啟動kafka
> bin/kafka-server-start.sh config/server.properties
3、創建主題
(a)創建一個名為test的主題
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
(b)查看正在運行的主題列表
> bin/kafka-topics.sh --list --zookeeper localhost:2181
4、消息發送
(a)運行一個生產者,併發送消息,主題為test
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
(b)發送消息
This is a message
This is another message
5、消息接收
(a)運行一個消費者,並收到信息,主題為test
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
(b)消息接收
This is a message
This is another message
6、停止服務
(a)停止zookeeper
> bin/zookeeper-server-stop.sh
(b)停止kafka
> bin/kafka-server-stop.sh
7、java開發
(a)包引用
<dependency>
<groupid>org.apache.kafka/<groupid>
<artifactid>kafka_2.10/<artifactid>
<version>0.8.2.1/<version>
(b)生產者代碼producer
package my.kafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerTest {
public static void main(String[] args) {
//屬性
Properties props = new Properties();
props.put("metadata.broker.list", "192.168.56.102:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
//構造生產者屬性
ProducerConfig config =
new ProducerConfig(props);//生成生產者
Producer<string> producer = new Producer<string>(config);/<string>/<string>
//主題
String topic = "test";
//消息
String msg = "恭喜你發財!";
//構造消息體
KeyedMessage<string> data = new KeyedMessage<string>(topic, msg);/<string>/<string>
//消息發送
producer.send(data);
//生產者關閉
producer.close();
}
}
(a) 消費者代碼Consumer
package my.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class HighLevelConsumerTest{
public static void main(String[] args) {
//屬性
Properties props = new Properties();
props.put("zookeeper.connect","192.168.56.102:2181");
props.put("group.id", "0");
//構造消費者屬性
ConsumerConfig config = new ConsumerConfig(props);
//生成消費者
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
//主題
String topic = "test";
//偵聽
Map<string> topickMap = new HashMap<string>(); /<string>/<string>
topickMap.put(topic, 1);
Map<string>byte[],byte[]>>> streamMap = consumer.createMessageStreams(topickMap); /<string>
KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
ConsumerIterator<byte[],byte[]> it =stream.iterator();
while(it.hasNext()){
System.err.println("get data:" +
new String(it.next().message()));}
}
}
集群
1、 單節點,一個broker的集群
![Kafka學習指南](http://p2.ttnews.xyz/loading.gif)
上例就是一個單節點,一個broker的集群。
2、 單節點,多個broker的集群
以下命令是上例中命令的補充,即可組成多broker集群。
(a)為多個broker創建配置文件,這裡再準備兩個
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
(b)編輯配置文件,設置brokerId,port以及持久化文件所在目錄
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
(c) 啟動多個kafka服務
> bin/kafka-server-start.sh config/server-1.properties &
> bin/kafka-server-start.sh config/server-2.properties &
(d) 創建一個新主題my-replicated-topic,且複製三個複本
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
(e)查詢集群裡每個broker的工作
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
輸出:
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
名詞解釋:
l "leader" - 每個Partition(分區)以及它的複本,都會選出一個leader,作為生產者和消費者的唯一交互。該命令顯示意為,leader是id為1的broker。
l "replicas" - 複本,分區的備份,該命令顯示意為,在broker為1、2、0的broker裡,都有該話題該分區的複本,其中一個為leader,其餘的為follower,當leader當機時,kafka會從follower裡自動選舉出一個新的leader,以繼續工作。
l "isr" - 複本列表,指明還在活動的,與leader有關聯的複本集合。
3、 多節點,多個broker的集群
同上,只是broker的地址不一樣,不過多個broker必須指向同一個zookeeper集群。
4、 java開發
這裡的代碼,只做上面java代碼的增量補充。
(b) 生產者代碼producer
修改生產者屬性:
props.put("metadata.broker.list", "192.168.56.102:9092");
多broker集群時,增加已上屬性的值即可,用逗號分隔,如
props.put("metadata.broker.list", "192.168.56.102:9092, 192.168.56.102:9093, 192.168.1.133:9092");
以上則指向兩個節點,共三個broker的集群。
(c) 消費者代碼Consumer
消費者代碼不需要變動,因為消費者由zookeeper集群來通知去哪個broker哪個Partition拉數據,它並不關心broker集群的變化。
設計解析
1、 Topic & Parition
Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把這條消息放進哪個queue裡。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition,每個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件。若創建topic1和topic2兩個topic,且分別有13個和19個分區,則整個集群上會相應會生成共32個文件夾。
每個日誌文件都是一個log entrie序列,每個log entrie包含一個4字節整型數值(值為N+5),1個字節的"magic value",4個字節的CRC校驗碼,其後跟N個字節的消息體。每條消息都有一個當前Partition下唯一的64字節的offset,它指明瞭這條消息的起始位置。
這個log entries並非由一個文件構成,而是分成多個segment,每個segment以該segment第一條消息的offset命名並以“.kafka”為後綴。另外會有一個索引文件,它標明瞭每個segment下包含的log entry的offset範圍,如下圖所示。
因為每條消息都被append到該Partition中,屬於順序寫磁盤,因此效率非常高(經驗證,順序寫磁盤效率比隨機寫內存還要高,這是Kafka高吞吐率的一個很重要的保證)。
對於傳統的message queue而言,一般會刪除已經被消費的消息,而Kafka集群會保留所有的消息,無論其被消費與否。當然,因為磁盤限制,不可能永久保留所有數據(實際上也沒必要),因此Kafka提供兩種策略刪除舊數據。一是基於時間,二是基於Partition文件大小。例如可以通過配置server.properties,讓Kafka刪除一週前的數據(默認配置),也可在Partition文件超過1GB時刪除舊數據。
這裡要注意,因為Kafka讀取特定消息的時間複雜度為O(1),即與文件大小無關,所以這裡刪除過期文件與提高Kafka性能無關。選擇怎樣的刪除策略只與磁盤以及具體的需求有關。另外,Kafka會為每一個Consumer Group保留一些metadata信息——當前消費的消息的position,也即offset。這個offset由Consumer控制。正常情況下Consumer會在消費完一條消息後遞增該offset。當然,Consumer也可將offset設成一個較小的值,重新消費一些消息。因為offet由Consumer控制,所以Kafka broker是無狀態的,它不需要標記哪些消息被哪些消費過,也不需要通過broker去保證同一個Consumer Group只有一個Consumer能消費某一條消息,因此也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障。
2、 Producer消息路由
Producer發送消息到broker時,會根據Paritition機制選擇將其存儲到哪一個Partition。如果Partition機制設置合理,所有消息可以均勻分佈到不同的Partition裡,這樣就實現了負載均衡。如果一個Topic對應一個文件,那這個文件所在的機器I/O將會成為這個Topic的性能瓶頸,而有了Partition後,不同的消息可以並行寫入不同broker的不同Partition裡,極大的提高了吞吐率。可以在server.properties中通過配置項num.partitions來指定新建Topic的默認Partition數量(注意:如果通過該屬性來指定分區數,在broker集群裡,按配置的最大值生效,而不是累加),也可在創建Topic時通過參數指定,同時也可以在Topic創建之後通過Kafka提供的工具修改。
在發送一條消息時,可以指定這條消息的key,Producer根據這個key和Partition機制來判斷應該將這條消息發送到哪個Parition。Paritition機制可以通過指定Producer的paritition. class這一參數來指定,該class必須實現kafka.producer.Partitioner接口。本例中如果key可以被解析為整數則將對應的整數與Partition總數取餘,該消息會被髮送到該數對應的Partition。(每個Parition都會有個序號,序號從0開始)
3、 Consumer Group
使用Consumer high level API時,同一Topic的一條消息只能被同一個Consumer Group內的一個Consumer消費,但多個Consumer Group可同時消費這一消息。
這是Kafka用來實現一個Topic消息的廣播(發給所有的Consumer)和單播(發給某一個Consumer)的手段。一個Topic可以對應多個Consumer Group。如果需要實現廣播,只要每個Consumer有一個獨立的Group就可以了。要實現單播只要所有的Consumer在同一個Group裡。用Consumer Group還可以將Consumer進行自由的分組而不需要多次發送消息到不同的Topic。
實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可以使用Storm這種實時流處理系統對消息進行實時在線處理,同時使用Hadoop這種批處理系統進行離線處理,還可以同時將數據實時備份到另一個數據中心,只需要保證這三個操作所使用的Consumer屬於不同的Consumer Group即可。下圖是Kafka在Linkedin的一種簡化部署示意圖。
4、 Push vs. Pull
作為一個消息系統,Kafka遵循了傳統的方式,選擇由Producer向broker push消息並由Consumer從broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,採用push模式。事實上,push模式和pull模式各有優劣。
push模式很難適應消費速率不同的消費者,因為消息發送速率是由broker決定的。push模式的目標是儘可能以最快速度傳遞消息,但是這樣很容易造成Consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則可以根據Consumer的消費能力以適當的速率消費消息。
對於Kafka而言,pull模式更合適。pull模式可簡化broker的設計,Consumer可自主控制消費消息的速率,同時Consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。
5、 Kafka delivery guarantee
有這麼幾種可能的delivery guarantee:
· At most once 消息可能會丟,但絕不會重複傳輸
· At least one 消息絕不會丟,但可能會重複傳輸
· Exactly once 每條消息肯定會被傳輸一次且僅傳輸一次,很多時候這是用戶所想要的。
當Producer向broker發送消息時,一旦這條消息被commit,因數replication的存在,它就不會丟。但是如果Producer發送數據給broker後,遇到網絡問題而造成通信中斷,那Producer就無法判斷該條消息是否已經commit。雖然Kafka無法確定網絡故障期間發生了什麼,但是Producer可以生成一種類似於主鍵的東西,發生故障時冪等性的重試多次,這樣就做到了Exactly once。(所以目前默認情況下一條消息從Producer到broker是確保了At least once,可通過設置Producer異步發送實現At most once)。
接下來討論的是消息從broker到Consumer的delivery guarantee語義。(僅針對Kafka consumer high level API)。Consumer在從broker讀取消息後,可以選擇commit,該操作會在Zookeeper中保存該Consumer在該Partition中讀取的消息的offset。該Consumer下一次再讀該Partition時會從下一條開始讀取。如未commit,下一次讀取的開始位置會跟上一次commit之後的開始位置相同。當然可以將Consumer設置為autocommit,即Consumer一旦讀到數據立即自動commit。如果只討論這一讀取消息的過程,那Kafka是確保了Exactly once。但實際使用中應用程序並非在Consumer讀取完數據就結束了,而是要進行進一步處理,而數據處理與commit的順序在很大程度上決定了消息從broker和consumer的delivery guarantee semantic。
· 讀完消息先commit再處理消息。這種模式下,如果Consumer在commit後還沒來得及處理消息就crash了,下次重新開始工作後就無法讀到剛剛已提交而未處理的消息,這就對應於At most once
· 讀完消息先處理再commit。這種模式下,如果在處理完消息之後commit之前Consumer crash了,下次重新開始工作時還會處理剛剛未commit的消息,實際上該消息已經被處理過了。這就對應於At least once。在很多使用場景下,消息都有一個主鍵,所以消息的處理往往具有冪等性,即多次處理這一條消息跟只處理一次是等效的,那就可以認為是Exactly once。
· 如果一定要做到Exactly once,就需要協調offset和實際操作的輸出。精典的做法是引入兩階段提交。如果能讓offset和操作輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,因為許多輸出系統可能不支持兩階段提交。比如,Consumer拿到數據後可能把數據放到HDFS,如果把最新的offset和數據本身一起寫到HDFS,那就可以保證數據的輸出和offset的更新要麼都完成,要麼都不完成,間接實現Exactly once。(目前就high level API而言,offset是存於Zookeeper中的,無法存於HDFS,而low level API的offset是由自己去維護的,可以將之存於HDFS中)
總之,Kafka默認保證At least once,並且允許通過設置Producer異步提交來實現At most once。而Exactly once要求與外部存儲系統協作,幸運的是Kafka提供的offset可以非常直接非常容易得使用這種方式。
6、 Replication
Kafka在0.8以前的版本中,並不提供High Availablity機制,一旦一個或多個Broker宕機,則宕機期間其上所有Partition都無法繼續提供服務。若該Broker永遠不能再恢復,亦或磁盤故障,則其上數據將丟失。而Kafka的設計目標之一即是提供數據持久化,同時對於分佈式系統來說,尤其當集群規模上升到一定程度後,一臺或者多臺機器宕機的可能性大大提高,對Failover要求非常高。因此,Kafka從0.8開始提供High Availability機制。
引入Replication之後,同一個Partition可能會有多個Replica,而這時需要在這些Replication之間選出一個Leader,Producer和Consumer只與這個Leader交互,其它Replica作為Follower從Leader中複製數據。因為需要保證同一個Partition的多個Replica之間的數據一致性。如果沒有一個Leader,所有Replica都可同時讀/寫數據,那就需要保證多個Replica之間互相(N×N條通路)同步數據,數據的一致性和有序性非常難保證,大大增加了Replication實現的複雜性,同時也增加了出現異常的幾率。而引入Leader後,只有Leader負責數據讀寫,Follower只向Leader順序Fetch數據(N條通路),系統更加簡單且高效。
為了更好的做負載均衡,Kafka儘量將所有的Partition均勻分配到整個集群上。一個典型的部署方式是一個Topic的Partition數量大於Broker的數量。同時為了提高Kafka的容錯能力,也需要將同一個Partition的Replica儘量分散到不同的機器。實際上,如果所有的Replica都在同一個Broker上,那一旦該Broker宕機,該Partition的所有Replica都無法工作,也就達不到HA的效果。同時,如果某個Broker宕機了,需要保證它上面的負載可以被均勻的分配到其它倖存的所有Broker上。
Kafka分配Replica的算法如下:
1) 將所有Broker(假設共n個Broker)和待分配的Partition排序
2) 將第i個Partition分配到第(i mod n)個Broker上
3) 將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上
Producer在發佈消息到某個Partition時,先通過ZooKeeper找到該Partition的Leader,然後無論該Topic的Replication Factor為多少(也即該Partition有多少個Replica),Producer只將該消息發送到該Partition的Leader。Leader會將該消息寫入其本地Log。每個Follower都從Leader pull數據。這種方式上,Follower存儲的數據順序與Leader保持一致。Follower在收到該消息並寫入其Log後,向Leader發送ACK。一旦Leader收到了ISR中的所有Replica的ACK,該消息就被認為已經commit了,Leader將增加HW並且向Producer發送ACK。
為了提高性能,每個Follower在接收到數據後就立馬向Leader發送ACK,而非等到數據寫入Log中。因此,對於已經commit的消息,Kafka只能保證它被存於多個Replica的內存中,而不能保證它們被持久化到磁盤中,也就不能完全保證異常發生後該條消息一定能被Consumer消費。
Kafka Replication的數據流
關於負載均衡和高可用性
由以上可知,要實現負載均衡以及高可用性,我們要充分考慮話題分區的數量和使用方式,以及複本數,其中複本數量太多也會對性能有一定的影響。
1、Broker數量
根據實際需要設置broker的數量,即server.properties的數量,每個properties代表一個broker,其中主要屬性有broker.id、port、log.dirs、zookeeper.connect。
2、話題分區
要大於broker數量,可以在創建話題的時候指定分區數,或者根據broker的配置默認分區數,如server.properties裡的num.partitions屬性,默認為1。如果是多broker集群,那麼分區的數量是多個server.properties裡num.partitions值的最大值,而不是值的累加。
3、複本
創建話題的時候,一般需要三個複本,用來保證可用性,不至於在一個broker崩潰時,導致該broker管理的partition不能使用而丟失數據,因為kafka會在複本裡找到follower選出新的leader。可以在創建話題的時候指定複本數 ,或者根據broker的配置默認複本數,如server.properties裡的default.replication.factor屬性,默認為1。
4、生產者
生產者是需要指向broker集群,
props.put("metadata.broker.list", "192.168.56.102:9092");
並且制定消息發送策略,這個策略決定了消息要發往哪個分區,只有充分利用所有分區,才能發揮broker集群的效果,
props.put("partitioner.class", "my.kafka.SimplePartitioner");
my.kafka.SimplePartitioner這個類的代碼如下:
package my.kafka;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
//必須實現kafka.producer.Partitioner
public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) {}
//實現這個方法
public int partition(Object key, int numPartitions) {
try {
//key是構造消息時候傳入的一個參數,numPartitions是指定話題的分區數
//這裡以取模的方式,來指定發往哪個分區
return Math.abs(Integer.parseInt((String) key) % numPartitions);
} catch (Exception e) {
return Math.abs(key.hashCode() % numPartitions);
}
}
}
這時,在構造消息的時候,需要送入一個key,如
new KeyedMessage<string>("話題", "key", "消息")/<string>
如果不設置上述屬性,kafka會使用kafka.producer.DefaultPartitioner來作為分區選擇類,它的策略是返回Utils.abs(key.hashCode) % numPartitions。
5、 消費者
上述broker數量、話題分區數量的設定,以及生產者發送消息時對分區的選擇,都是消費者負載均衡的基礎。因為同一個消息,只能被同一個消費者群組裡的一個消費者消費,如果只有一個分區,或者生產者只往一個分區發送消息,那麼,同一個消費者群組裡,只有一個消費者能消費消息,而其它消費者永遠都不能消費,這是kafka的機制決定的。當設置了多個分區,則每個分區都會跟同一個消費者群組裡的一個或者多個消費者建立關係(參見“設計解析”第3點消費者群組的示意圖),這時同時往不同的分區發送消息,就可以讓多個消費者同時消費消息,達到並行或者多線程的效果,來提高效率。要注意的是,當同一個消費者群組裡的消費者數量大於分區的數量時,多餘的消費者同樣不會消費消息。設置同一個群組的方法是,設置props.put("group.id", "0"),值相同就標識在同一個群組。
下面這個例子更清晰地展示了Kafka Consumer Group的特性。首先創建一個Topic (名為topic1,包含3個Partition),然後創建一個屬於group1的Consumer實例,並創建三個屬於group2的Consumer實例,最後通過Producer向topic1發送key分別為1,2,3的消息。結果發現屬於group1的Consumer收到了所有的這三條消息,同時group2中的3個Consumer分別收到了key為1,2,3的消息。如下圖所示
6、 整體示意圖
Yahoo的管理工具
Yahoo開源了他們的kafka管理工具,可以在https://github.com/yahoo/kafka-manager獲取到源碼,根據其文檔構建工程。過程如下:
修改配置:
因為構建工程後,會把配置文件放入jar,所以一般要先修改完再構建。當然,也可以構建完再到jar裡找出相應的配置進行修改。修改Conf\\application.conf裡kafka-manager.zkhosts屬性,指向一個zookeeper集群,一般使用跟kafka同一個zookeeper集群。
構建/部署:
先下載一個sbt,是一個scala的構建工具,地址http://www.scala-sbt.org/。然後執行sbt clean dist,如果提示sbt不是命令,請將之前下載的sbt配置到path,或者/path/to/sbt clean dist,/path/to為sbt路徑。由於要求jdk要1.7以上,可以指定javahome,如/path/to/sbt -java-home /usr/local/oracle-java-8 dist clean。
啟動服務:
bin/kafka-manager,默認為9000端口,指定端口命令為bin/kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=8080。如果要指定javahome,可以用bin/kafka-manager -java-home /usr/local/oracle-java-8。
服務地址:
http://IP:PORT/
進入後,要手動增加要監控的zookeeper集群,然後就可以監控在這個zookeeper集群註冊的kafka集群了。
注意事項
1、 配置
在配置正確的情況下,如果producer拋出消費發送失敗的異常,則需要修改以下配置:
/kafka/config/server.properties
#host.name=localhost修改為host.name=ip
即,開放host.name且修改localhost為當前ip
2、 生產者屬性說明
metadata.broker.list 代理列表,即kafka服務列表,值為ip+端口,可配置多個服務,用逗號分隔。
producer.type 生產者發送消息的方式,同步sync或者異步async ,默認是同步。異步情況下,如果發生連接異常,將可能丟失數據。
serializer.clas 消息的序列化類,默認是kafka.serializer.DefaultEncoder
partitioner.class 話題分區指定類,默認是kafka.producer.DefaultPartitioner,策略是基於key的hashcode與分區數的模。
3、 消費者屬性說明
group.id 消費者群組id,同一個id標識屬於同一個消費者群組
zookeeper.connect zookeeper集群,格式hostname1:port1,hostname2:port2
auto.commit.enable 是否自動提交偏移量(offset )到zookeeper,默認為true
auto.offset.reset 偏移量重置,即消費者要從哪裡開始消費,僅支持smallest(從頭開始消費)和largest(從最新的消息開始消費,默認),其它值將拋異常。
4、 Broker屬性說明
broker.id broker實例id,不能重複
log.dirs broker裡所有消息實例化文件的存放位置,默認為/tmp/kafka-logs,不同實例最好分開
port 端口,默認9092
zookeeper.connect 註冊到的zookeeper集群,格式hostname1:port1,hostname2:port2
host.name host,默認為null,有時生產者連接不上時,要改成ip
num.partitions 默認的話題分區數
log.retention.hours 話題裡的消息保留時間,過期將刪除,默認168小時(7天)
auto.create.topics.enable 話題不存在時,按默認的分區和複本設置,創建話題,默認true
default.replication.factor 默認複本數,默認值1
5、 zookeeper集群
dataDir zookeeper持久化文件(協調數據)位置,默認/tmp/zookeeper
clientPort 客戶端監聽端口,即客戶端需要訪問這個端口進行註冊
maxClientCnxns 客戶端連接數,默認為0,不限制
//以下為集群節點配置,需要在每個zookeeper服務裡配置,接下來需要根據server.X的號碼在相應的節點上的dataDir下建立myid文件,輸入自身的“X”作為內容即可,比如master節點上的myid只輸入1即可。
server.1=master:2888:3888
server.2=node1:2888:3888
server.3=node2:2888:3888
等待服務都啟動完畢後,zookeeper集群會選舉出其中一個服務作為leader。
6、低階消費者API
閱讀更多 IT資訊網 的文章