Kafka基礎知識大補

01 kafka基礎

Kafka概念

  • Kafka作為一個集群,運行在一臺或者多臺服務器上。
  • Kafka 通過 topic 對存儲的流數據進行分類。
  • 每條記錄中包含一個key,一個value和一個timestamp(時間戳)。

Kafka特性

  • 可以讓你發佈和訂閱流式的記錄。這一方面與消息隊列或者企業消息系統類似。
  • 可以儲存流式的記錄,並且有較好的容錯性。
  • 可以在流式記錄產生時就進行處理。

Kafka適用場景

  • 構造實時流數據管道,它可以在系統或應用之間可靠地獲取數據。 (mq)
  • 構建實時流式應用程序,對這些流數據進行轉換或者影響。 (就是流處理,通過kafka stream topic和topic之間內部進行變化)

Kafka使用場景

1.日誌收集

一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等。

2.消息系統

解耦和生產者和消費者、緩存消息等。

3.用戶活動跟蹤

Kafka經常被用來記錄web用戶或者app用戶的各種活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發佈到kafka的topic中,然後訂閱者通過訂閱這些topic來做實時的監控分析,或者裝載到hadoop、數據倉庫中做離線分析和挖掘。

4.運營指標

Kafka也經常用來記錄運營監控數據。包括收集各種分佈式應用的數據,生產各種操作的集中反饋,比如報警和報告。

5.流式處理

比如spark streaming和storm

Kafka核心API

  • Producer 允許一個應用程序發佈一串流式的數據到一個或者多個Kafka topic。
  • Consumer 允許一個應用程序訂閱一個或多個 topic ,並且對發佈給他們的流式數據進行處理。
  • Streams 允許一個應用程序作為一個流處理器,消費一個或者多個topic產生的輸入流,然後生產一個輸出流到一個或多個topic中去,在輸入輸出流中進行有效的轉換。
  • Connector 允許構建並運行可重用的生產者或者消費者,將Kafka topics連接到已存在的應用程序或者數據系統。比如,連接到一個關係型數據庫,捕捉表(table)的所有變更內容。

Topics 與日誌

Topic 就是數據主題,是數據記錄發佈的地方,可以用來區分業務系統。Kafka中的Topics總是多訂閱者模式,一個topic可以擁有一個或者多個消費者來訂閱它的數據。

對於每一個topic, Kafka集群都會維持一個分區日誌。

每個分區都是有序且順序不可變的記錄集,並且不斷地追加到結構化的commit log文件。分區中的每一個記錄都會分配一個id號來表示順序,稱之為offset,offset用來唯一的標識分區中每一條記錄。

日誌中的 partition(分區)有以下幾個用途。

第一,當日志大小超過了單臺服務器的限制,允許日誌進行擴展。每個單獨的分區都必須受限於主機的文件限制,不過一個主題可能有多個分區,因此可以處理無限量的數據。

第二,可以作為並行的單元集—關於這一點。

Kafka基礎知識大補

Offset

每一個分區都是一個順序的、不可變的消息隊列,並且可以持續的添加。分區中的消息都被分配了一個序列號,稱之為偏移量(offset),在每個分區中此偏移量都是唯一的。

一個分區在文件系統裡存儲為一個文件夾。文件夾裡包含日誌文件和索引文件。其文件名是其包含的offset的最小的條目的offset。

Kafka基礎知識大補


Kafka基礎知識大補

offset

要查找offset為7的Message:首先是用二分查找確定它是在哪個LogSegment中,自然是在第一個Segment中。

打開這個Segment的index文件,也是用二分查找找到offset小於或者等於指定offset的索引條目中最大的那個offset。自然offset為6的那個索引是我們要找的,通過索引文件我們知道offset為6的Message在數據文件中的位置為9807。

打開數據文件,從位置為9807的那個地方開始順序掃描直到找到offset為7的那條Message。

這套機制是建立在offset是有序的。索引文件被映射到內存中,所以查找的速度還是很快的。

一句話,Kafka的Message存儲採用了分區(partition),分段(LogSegment)和稀疏索引這幾個手段來達到了高效性。

分佈式

日誌的分區partition (分佈)在Kafka集群的服務器上。每個服務器在處理數據和請求時,共享這些分區。每一個分區都會在已配置的服務器上進行備份,確保容錯性.

每個分區都有一臺 server 作為 “leader”,零臺或者多臺server作為 follwers 。leader server 處理一切對 partition (分區)的讀寫請求,而follwers只需被動的同步leader上的數據。當leader宕機了,followers 中的一臺服務器會自動成為新的 leader。每臺 server 都會成為某些分區的 leader 和某些分區的 follower,因此集群的負載是平衡的。

生產者

生產者可以將數據發佈到所選擇的topic(主題)中。生產者負責將記錄分配到topic的哪一個 partition(分區)中。可以使用循環的方式來簡單地實現負載均衡,也可以根據某些語義分區函數(例如:記錄中的key)來完成。

消費者

消費者使用一個 消費組 名稱來進行標識,發佈到topic中的每條記錄被分配給訂閱消費組中的一個消費者實例.消費者實例可以分佈在多個進程中或者多個機器上。

如果所有的消費者實例在同一消費組中,消息記錄會負載平衡到每一個消費者實例.

如果所有的消費者實例在不同的消費組中,每條消息記錄會廣播到所有的消費者進程。

Kafka基礎知識大補

如上圖所示, Kafka 集群有兩臺 server 的,四個分區(p0-p3)和兩個消費者組。消費組A有兩個消費者,消費組B有四個消費者。

通常情況下,每個 topic 都會有一些消費組,一個消費組對應一個"邏輯訂閱者"。一個消費組由許多消費者實例組成,便於擴展和容錯。這就是發佈和訂閱的概念,只不過訂閱者是一組消費者而不是單個的進程。

在Kafka中實現消費的方式是將日誌中的分區劃分到每一個消費者實例上,以便在任何時間,每個實例都是分區唯一的消費者。維護消費組中的消費關係由Kafka協議動態處理。如果新的實例加入組,他們將從組中其他成員處接管一些 partition 分區;如果一個實例消失,擁有的分區將被分發到剩餘的實例。

Kafka 只保證分區內的記錄是有序的,而不保證主題中不同分區的順序。每個 partition 分區按照key值排序足以滿足大多數應用程序的需求。但如果你需要總記錄在所有記錄的上面,可使用僅有一個分區的主題來實現,這意味著每個消費者組只有一個消費者進程。

Kafka作為消息系統

消費組在Kafka有兩層概念。在隊列中,消費組允許你將處理過程分發給一系列進程(消費組中的成員)。 在發佈訂閱中,Kafka允許你將消息廣播給多個消費組。

Kafka的優勢在於每個topic都有以下特性—可以擴展處理並且允許多訂閱者模式—不需要只選擇其中一個。

Kafka 設計的更好。topic中的partition是一個並行的概念。 Kafka能夠為一個消費者池提供順序保證和負載平衡,是通過將topic中的partition分配給消費者組中的消費者來實現的, 以便每個分區由消費組中的一個消費者消耗。通過這樣,我們能夠確保消費者是該分區的唯一讀者,並按順序消費數據。 眾多分區保證了多個消費者實例間的負載均衡。但請注意,消費者組中的消費者實例個數不能超過分區的數量。

Kafka 作存儲系統

數據寫入Kafka後被寫到磁盤,並且進行備份以便容錯。直到完全備份,Kafka才讓生產者認為完成寫入,即使寫入失敗Kafka也會確保繼續寫入。

Kafka用做流處理

Kafka流處理器不斷地從輸入的topic獲取流數據,處理數據後,再不斷生產流數據到輸出的topic中去。

批處理

將消息、存儲和流處理結合起來,使得Kafka看上去不一般,但這是它作為流平臺所備的。

Kafka 配置介紹

Broker 核心配置

  • broker.id

用於服務的broker id。如果沒設置,將生存一個唯一broker id。為了避免ZooKeeper生成的id和用戶配置的broker id相沖突,生成的id將在reserved.broker.max.id的值基礎上加1。

  • log.dirs

保存日誌數據的目錄,如果未設置將使用log.dir的配置。

  • zookeeper.connect

Zookeeper主機地址

Topic級別配置

與Topic相關的配置既包含服務器默認值,也包含可選的每個Topic覆蓋值。 如果沒有給出每個Topic的配置,那麼服務器默認值就會被使用。 通過提供一個或多個 --config 選項,可以在創建Topic時設置覆蓋值。

  • 使用自定義的最大消息大小和刷新率創建了一個名為 my-topic 的topic:

> bin/kafka-topics.sh

--zookeeper localhost:2181

--create --topic my-topic --partitions 1

--replication-factor 1

--config max.message.bytes=64000

--config flush.messages=1

  • 可以在使用alter configs命令稍後更改或設置覆蓋值. 本示例重置my-topic的最大消息的大小:

> bin/kafka-configs.sh

--zookeeper localhost:2181

--entity-type topics

--entity-name my-topic

--alter

--add-config max.message.bytes=128000

......

Producer 配置

比較核心的配置:metadata.broker.list、request.required.acks、producer.type、serializer.class

## 消費者獲取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面設置一個vip

metadata.broker.list

##消息的確認模式

##0:不保證消息的到達確認,只管發送,低延遲但是會出現消息的丟失,在某個server失敗的情況下,有點像TCP

##1:發送消息,並會等待leader 收到確認後,一定的可靠性

## -1:發送消息,等待leader收到確認,並進行復制操作後,才返回,最高的可靠性

request.required.acks =0

## 消息發送的最長等待時間

request.timeout.ms =10000

## socket的緩存大小

send.buffer.bytes=100*1024

## key的序列化方式,若是沒有設置,同serializer.class

key.serializer.class

## 分區的策略,默認是取模

partitioner.class=kafka.producer.DefaultPartitioner

## 消息的壓縮模式,默認是none,可以有gzip和snappy

compression.codec = none

## 可以針對默寫特定的topic進行壓縮

compressed.topics=null

## 消息發送失敗後的重試次數

message.send.max.retries =3

## 每次失敗後的間隔時間

retry.backoff.ms =100

## 生產者定時更新topic元信息的時間間隔 ,若是設置為0,那麼會在每個消息發送後都去更新數據

topic.metadata.refresh.interval.ms =600*1000

## 用戶隨意指定,但是不能重複,主要用於跟蹤記錄消息

client.id=""

消息模式 相關

## 生產者的類型 async:異步執行消息的發送 sync:同步執行消息的發送

producer.type=sync

## 異步模式下,那麼就會在設置的時間緩存消息,並一次性發送

queue.buffering.max.ms =5000

## 異步的模式下 最長等待的消息數

queue.buffering.max.messages =10000

## 異步模式下,進入隊列的等待時間 若是設置為0,那麼要麼進入隊列,要麼直接拋棄

queue.enqueue.timeout.ms = -1

## 異步模式下,每次發送的最大消息數,前提是觸發了queue.buffering.max.messages或是queue.buffering.max.ms的限制

batch.num.messages=200

## 消息體的系列化處理類 ,轉化為字節流進行傳輸

serializer.class= kafka.serializer.DefaultEncoder

Consumer 配置

最為核心的配置是group.id、zookeeper.connect

## Consumer歸屬的組ID,broker是根據group.id來判斷是隊列模式還是發佈訂閱模式,非常重要

group.id

## 消費者的ID,若是沒有設置的話,會自增

consumer.id

## 一個用於跟蹤調查的ID ,最好同group.id相同

client.id = group id value

## 對於zookeeper集群的指定,可以是多個 hostname1:port1,hostname2:port2,hostname3:port3 必須和broker使用同樣的zk配置

zookeeper.connect=localhost:2182

## zookeeper的心跳超時時間,查過這個時間就認為是dead消費者

zookeeper.session.timeout.ms =6000

## zookeeper的等待連接時間

zookeeper.connection.timeout.ms =6000

## zookeeper的follower同leader的同步時間

zookeeper.sync.time.ms =2000

## 當zookeeper中沒有初始的offset時候的處理方式 。smallest :重置為最小值 largest:重置為最大值 anythingelse:拋出異常

auto.offset.reset = largest

## socket的超時時間,實際的超時時間是:max.fetch.wait + socket.timeout.ms.

socket.timeout.ms=30*1000

## socket的接受緩存空間大小

socket.receive.buffer.bytes=64*1024

##從每個分區獲取的消息大小限制

fetch.message.max.bytes =1024*1024

## 是否在消費消息後將offset同步到zookeeper,當Consumer失敗後就能從zookeeper獲取最新的offset

auto.commit.enable =true

## 自動提交的時間間隔

auto.commit.interval.ms =60*1000

## 用來處理消費消息的塊,每個塊可以等同於fetch.message.max.bytes中數值

queued.max.message.chunks =10

## 當有新的consumer加入到group時,將會reblance,此後將會有partitions的消費端遷移到新

## 的consumer上,如果一個consumer獲得了某個partition的消費權限,那麼它將會向zk註冊

##"Partition Owner registry"節點信息,但是有可能此時舊的consumer尚沒有釋放此節點,

## 此值用於控制,註冊節點的重試次數.

rebalance.max.retries =4

## 每次再平衡的時間間隔

rebalance.backoff.ms =2000

## 每次重新選舉leader的時間

refresh.leader.backoff.ms

## server發送到消費端的最小數據,若是不滿足這個數值則會等待,知道滿足數值要求

fetch.min.bytes =1

## 若是不滿足最小大小(fetch.min.bytes)的話,等待消費端請求的最長等待時間

fetch.wait.max.ms =100

## 指定時間內沒有消息到達就拋出異常,一般不需要改

consumer.timeout.ms = -1

Kafka Connect 配置

Kafka Streams 配置

AdminClient 配置



02 Kafka架構

一個典型的 Kafka 體系架構包括若干 Producer(消息生產者),若干 broker(作為 Kafka 節點的服務器),若干 Consumer(Group),以及一個 ZooKeeper 集群。Kafka通過 ZooKeeper 管理集群配置、選舉 Leader 以及在 consumer group 發生變化時進行 Rebalance(即消費者負載均衡,在下一課介紹)。Producer 使用 push(推)模式將消息發佈到 broker,Consumer 使用 pull(拉)模式從 broker 訂閱並消費消息。

下圖:一個總體架構,並沒有對作為 Kafka 節點的 broker 進行深入刻畫,事實上,它的內部細節相當複雜,如下圖所示,Kafka 節點涉及 Topic、Partition 兩個重要概念。

Kafka基礎知識大補

kafka總體架構


Kafka基礎知識大補

Broker:Kafka的broker是無狀態的,broker使用Zookeeper維護集群的狀態。Leader的選舉也由Zookeeper負責。

Zookeeper:Zookeeper負責維護和協調broker。當Kafka系統中新增了broker或者某個broker發生故障失效時,由ZooKeeper通知生產者和消費者。生產者和消費者依據Zookeeper的broker狀態信息與broker協調數據的發佈和訂閱任務。

Producer:生產者將數據推送到broker上,當集群中出現新的broker時,所有的生產者將會搜尋到這個新的broker,並自動將數據發送到這個broker上。

Consumer:因為Kafka的broker是無狀態的,所以consumer必須使用partition offset來記錄消費了多少數據。如果一個consumer指定了一個topic的offset,意味著該consumer已經消費了該offset之前的所有數據。consumer可以通過指定offset,從topic的指定位置開始消費數據。consumer的offset存儲在Zookeeper中。

Kafka基礎知識大補

Kafka 基礎架構


03 Kafka環境搭建及相關命令

  • 安裝zookeeper
  • 下載代碼並解壓

> tar -xzf kafka_2.11-1.0.0.tgz

> cd kafka_2.11-1.0.0

  • 啟動服務器zk和kafka

>bin/zookeeper-server-start.sh config/zookeeper.properties

>bin/kafka-server-start.sh config/server.properties

  • 創建一個 topic 名為“test”的topic

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

  • 發送一些消息

Kafka自帶一個命令行客戶端,運行 producer,然後在控制檯輸入一些消息以發送到服務器。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

這是一條kafka測試消息

  • 啟動一個 consumer

命令行consumer(消費者),將消息轉儲到標準輸出。

>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

這是一條kafka測試消息

  • 列出所有Topic

bin/kafka-topics.sh --list --zookeeper 10.0.178.247:2181/kafka_test

  • 查看topic詳情

bin/kafka-topics.sh --describe --zookeeper 10.0.178.247:2181/kafka_test --topic test_lyb

04 Java kafka實例

java客戶端使用

生產者創建

Properties kafkaProperties = new Properties();

kafkaProperties.put("bootstrap.servers", "host1:9092,host2:9092,host3:9093");

kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<string> kafkaProducer = new KafkaProducer<>(kafkaProperties);/<string>

消息發送 發送消息一共分為三種模式

  • 發送即忘記
  • 同步發送
  • 異步發送

同步即忘記

/**

* 發送消息並忘記,雖然會進行重試,但是還是會丟失數據而客戶端並不知道

*/

@Test

public void sendMessageAndForget() {

ProducerRecord<string> producerRecord = new ProducerRecord<>(TOPIC_TEST, "test_message", "this test message");/<string>

kafkaProducer.send(producerRecord);

}

同步發送

/**

* 同步發送消息,通過send方法獲取到future,當發送失敗後,get()方法會返回異常

* @throws ExecutionException

* @throws InterruptedException

*/

@Test

public void sendMessageWithSync() throws ExecutionException, InterruptedException {

ProducerRecord<string> producerRecord = new ProducerRecord<>(TOPIC_TEST, "test_message", "this test message");/<string>

Future<recordmetadata> future = kafkaProducer.send(producerRecord);/<recordmetadata>

RecordMetadata recordMetadata = future.get();

System.out.println("record offset:" + recordMetadata.offset());

}

異步發送

/**

* 異步發送消息,通過在send方法中,增加一個回調函數,能夠知道發送結果

* @throws InterruptedException

*/

@Test

public void sendMessageWithAsync() throws InterruptedException {

ProducerRecord<string> producerRecord = new ProducerRecord<>(TOPIC_TEST, "test_message", "this test message");/<string>

kafkaProducer.send(producerRecord, new Callback() {

@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {

System.out.println(metadata.offset());

}

});

TimeUnit.SECONDS.sleep(3);

}

05 幾種信息系統比較(ActiveMQ、Kafka、RabbitMQ)

Kafka基礎知識大補



分享到:


相關文章: