Kafka知識整理

Kafka概述

Kafka是一個分佈式的基於發佈/訂閱模式的

消息隊列,主要應用於大數據實時處理領域。

消息隊列

消息隊列中間件是分佈式系統中重要的組件,主要解決應用耦合,異步消息,流量削鋒等問題

實現高性能,高可用,可伸縮和最終一致性架構

使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

作用主要有:

解耦,冗餘,擴展性,靈活性&峰值處理能力,可恢復性,順序保證性,緩衝,異步通信等。

1)解耦:

  允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。

2)冗餘:

消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所採用的"插入-獲取-刪除"範式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。

3)擴展性:

因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。

4)靈活性 & 峰值處理能力:

在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

5)可恢復性:

系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復後被處理。

6)順序保證:

在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,並且能保證數據會按照特定的順序來處理。(Kafka保證一個Partition內的消息的有序性)

7)緩衝:

有助於控制和優化數據流經過系統的速度,解決生產消息和消費消息的處理速度不一致的情況。

8)異步通信:

很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但並不立即處理它。想向隊列中放入多少消息就放多少,然後在需要的時候再去處理它們。

消息隊列的兩種模式

(1)點對點模式(一對一,消費者主動拉取數據,消息收到後消息清除)

消息生產者生產消息發送到Queue中,然後消息消費者從Queue中取出並且消費消息。

消息被消費以後,queue中不再有存儲,所以消息消費者不可能消費到已經被消費的消息。Queue支持存在多個消費者,但是對一個消息而言,只會有一個消費者可以消費。

Kafka知識整理

(2)發佈/訂閱模式(一對多,消費者消費數據之後不會清除消息)

消息生產者(發佈)將消息發佈到topic中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,發佈到topic的消息會被所有訂閱者消費。

Kafka知識整理

Kafka架構

Kafka是一個分佈式消息隊列。Kafka對消息保存時根據Topic進行歸類,發送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)稱為broker。無論是kafka集群,還是consumer都依賴於zookeeper集群保存一些meta信息,來保證系統可用性。

Kafka知識整理

Kafka知識整理

1)Producer :消息生產者,就是向kafka broker發消息的客戶端;

2)Consumer :消息消費者,向kafka broker取消息的客戶端;

3)Topic :可以理解為一個隊列;

4) Consumer Group (CG):這是kafka用來實現一個topic消息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段。一個topic可以有多個CG。topic的消息會複製(不是真的複製,是概念上的)到所有的CG,但每個partion只會把消息發給該CG中的一個consumer。如果需要實現廣播,只要每個consumer有一個獨立的CG就可以了。要實現單播只要所有的consumer在同一個CG。用CG還可以將consumer進行自由的分組而不需要多次發送消息到不同的topic;

5)Broker :一臺kafka服務器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic;

6)Partition:為了實現擴展性,一個非常大的topic可以分佈到多個broker(即服務器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發給consumer,不保證一個topic的整體(多個partition間)的順序;

7)Offset:kafka的存儲文件都是按照offset.kafka來命名,用offset做名字的好處是方便查找。例如你想找位於2049的位置,只要找到2048.kafka的文件即可。當然the first offset就是00000000000.kafka。

8)Leader:每個分區多個副本的“主”,生產者發送數據的對象,以及消費者消費數據的對象都是leader。

9)Follower:每個分區多個副本中的“從”,實時從leader中同步數據,保持和leader數據的同步。leader發生故障時,某個follower會成為新的follower。

10)Replica:副本,為保證集群中的某個節點發生故障時,該節點上的partition數據不丟失,且kafka仍然能夠繼續工作,kafka提供了副本機制,一個topic的每個分區都有若干個副本,一個leader和若干個follower

Kafka安裝部署

Kafka下載:http://kafka.apache.org/downloads.html

集群安排:hadoop102/zk/kafka,hadoop103/zk/kafka,hadoop104/zk/kafka。

解壓安裝包:

<code>tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module//<code>

修改解壓後的文件名稱:

<code>mv kafka_2.11-0.11.0.0/ kafka/<code>

在/opt/module/kafka目錄下創建logs文件夾:

<code>mkdir logs/<code>

修改配置文件:

<code>cd config/
vi server.properties/<code>

輸入以下內容:

<code>#broker的全局唯一編號,不能重複
broker.id=0
#刪除topic功能使能
delete.topic.enable=true
#處理網絡請求的線程數量
num.network.threads=3
#用來處理磁盤IO的現成數量
num.io.threads=8
#發送套接字的緩衝區大小
socket.send.buffer.bytes=102400
#接收套接字的緩衝區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩衝區大小
socket.request.max.bytes=104857600
#kafka運行日誌存放的路徑	
log.dirs=/opt/module/kafka/logs
#topic在當前broker上的分區個數
num.partitions=1
#用來恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168
#配置連接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/<code>

配置環境變量:

<code>sudo vi /etc/profile

#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile/<code>

分發安裝包:

<code>xsync kafka//<code>

注意:分發之後記得配置其他機器的環境變量

分別在hadoop103和hadoop104上修改配置文件/opt/module/kafka/config/server.properties

中的broker.id=1、broker.id=2。

啟動集群:

依次在hadoop102、hadoop103、hadoop104節點上啟動kafka

<code> bin/kafka-server-start.sh config/server.properties &/<code>

關閉集群:

<code>bin/kafka-server-stop.sh stop/<code>

Kafka命令行操作

查看當前服務器中的所有topic:

<code> bin/kafka-topics.sh --zookeeper hadoop102:2181 --list/<code>

創建topic:

<code>bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--create --replication-factor 3 --partitions 1 --topic first/<code>

選項說明:

--topic 定義topic名

--replication-factor 定義副本數

--partitions 定義分區數

刪除topic:

<code>bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--delete --topic first/<code>

需要server.properties中設置delete.topic.enable=true否則只是標記刪除或者直接重啟。

發送消息:

<code>bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic first
>hello world/<code>

消費消息:

<code>bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --from-beginning --topic first/<code>

--from-beginning:會把first主題中以往所有的數據都讀取出來。根據業務場景選擇是否增加該配置。

查看某個Topic的詳情:

<code>bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--describe --topic first/<code>

Kafka生產過程分析

kafka工作流程:


Kafka知識整理

Kafka中消息是以topic進行分類的,生產者生產消息,消費者消費消息,都是面向topic的。

topic是邏輯上的概念,而partition是物理上的概念,每個partition對應於一個log文件,該log文件中存儲的就是producer生產的數據。Producer生產的數據會被不斷追加到該log文件末端,且每條數據都有自己的offset。消費者組中的每個消費者,都會實時記錄自己消費到了哪個offset,以便出錯恢復時,從上次的位置繼續消費。

Kafka知識整理

由於生產者生產的消息會不斷追加到log文件末尾,為防止log文件過大導致數據定位效率低下,Kafka採取了分片索引機制,將每個partition分為多個segment。每個segment對應兩個文件——“.index”文件和“.log”文件。這些文件位於一個文件夾下,該文件夾的命名規則為:topic名稱+分區序號。例如,first這個topic有三個分區,則其對應的文件夾為first-0,first-1,first-2。

<code>00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log/<code>

index和log文件以當前segment的第一條消息的offset命名。下圖為index文件和log文件的結構示意圖。

Kafka知識整理

“.index”文件存儲大量的索引信息,“.log”文件存儲大量的數據,索引文件中的元數據指向對應數據文件中message的物理偏移地址。

Kafka生產者

分區策略

1)分區的原因

(1)方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數據了;

(2)可以提高併發,因為可以以Partition為單位讀寫了。

2)分區的原則

我們需要將producer發送的數據封裝成一個ProducerRecord對象。

Kafka知識整理

(1)指明 partition 的情況下,直接將指明的值直接作為 partiton 值;

(2)沒有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition 數進行取餘得到 partition 值;

(3)既沒有 partition 值又沒有 key 值的情況下,第一次調用時隨機生成一個整數(後面每次調用在這個整數上自增),將這個值與 topic 可用的 partition 總數取餘得到 partition 值,也就是常說的 round-robin 算法。

數據可靠性保證

為保證producer發送的數據,能可靠的發送到指定的topic,topic的每個partition收到producer發送的數據後,都需要向producer發送ack(acknowledgement確認收到),如果producer收到ack,就會進行下一輪的發送,否則重新發送數據。

Kafka知識整理

1)副本數據同步策略

Kafka知識整理

Kafka選擇了第二種方案,原因如下:

1.同樣為了容忍n臺節點的故障,第一種方案需要2n+1個副本,而第二種方案只需要n+1個副本,而Kafka的每個分區都有大量的數據,第一種方案會造成大量數據的冗餘。

2.雖然第二種方案的網絡延遲會比較高,但網絡延遲對Kafka的影響較小。

2)ISR

採用第二種方案之後,設想以下情景:leader收到數據,所有follower都開始同步數據,但有一個follower,因為某種故障,遲遲不能與leader進行同步,那leader就要一直等下去,直到它完成同步,才能發送ack。這個問題怎麼解決呢?

Leader維護了一個動態的in-sync replica set (ISR),意為和leader保持同步的follower集合。當ISR中的follower完成數據的同步之後,leader就會給follower發送ack。如果follower長時間未向leader同步數據,則該follower將被踢出ISR,該時間閾值由replica.lag.time.max.ms參數設定。Leader發生故障之後,就會從ISR中選舉新的leader。

3)ack應答機制

對於某些不太重要的數據,對數據的可靠性要求不是很高,能夠容忍數據的少量丟失,所以沒必要等ISR中的follower全部接收成功。

所以Kafka為用戶提供了三種可靠性級別,用戶根據對可靠性和延遲的要求進行權衡,選擇以下的配置。

acks參數配置:

acks

0:producer不等待broker的ack,這一操作提供了一個最低的延遲,broker一接收到還沒有寫入磁盤就已經返回,當broker故障時有可能丟失數據

1:producer等待broker的ack,partition的leader落盤成功後返回ack,如果在follower同步成功之前leader故障,那麼將會丟失數據

Kafka知識整理

-1(all):producer等待broker的ack,partition的leader和follower全部落盤成功後才返回ack。但是如果在follower同步完成後,broker發送ack之前,leader發生故障,那麼會造成

數據重複

Kafka知識整理

4)故障處理細節

Kafka知識整理

(1)follower故障

follower發生故障後會被臨時踢出ISR,待該follower恢復後,follower會讀取本地磁盤記錄的上次的HW,並將log文件高於HW的部分截取掉,從HW開始向leader進行同步。等該follower的LEO大於等於該Partition的HW,即follower追上leader之後,就可以重新加入ISR了。

(2)leader故障

leader發生故障之後,會從ISR中選出一個新的leader,之後,為保證多個副本之間的數據一致性,其餘的follower會先將各自的log文件高於HW的部分截掉,然後從新的leader同步數據。

注意:這隻能保證副本之間的數據一致性,並不能保證數據不丟失或者不重複。

Exactly Once語義

對於某些比較重要的消息,我們需要保證exactly once語義,即保證每條消息被髮送且僅被髮送一次。

在0.11版本之後,Kafka引入了冪等性機制(idempotent),配合acks = -1時的at least once語義,實現了producer到broker的exactly once語義。

idempotent + at least once = exactly once

使用時,只需將enable.idempotence屬性設置為true,kafka自動將acks屬性設為-1。

Kafka消費者

消費方式

consumer採用pull(拉)模式從broker中讀取數據。

push(推)模式很難適應消費速率不同的消費者,因為消息發送速率是由broker決定的。它的目標是儘可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則可以根據consumer的消費能力以適當的速率消費消息。

pull模式不足之處是,如果kafka沒有數據,消費者可能會陷入循環中,一直返回空數據。針對這一點,Kafka的消費者在消費數據時會傳入一個時長參數timeout,如果當前沒有數據可供消費,consumer會等待一段時間之後再返回,這段時長即為timeout。

分區分配策略

一個consumer group中有多個consumer,一個 topic有多個partition,所以必然會涉及到partition的分配問題,即確定那個partition由哪個consumer來消費。

Kafka有兩種分配策略,一是round robin,一是range。

roundrobin:輪詢,range:範圍

offset的維護

由於consumer在消費過程中可能會出現斷電宕機等故障,consumer恢復後,需要從故障前的位置的繼續消費,所以consumer需要實時記錄自己消費到了哪個offset,以便故障恢復後繼續消費。

Kafka 0.9版本之前,consumer默認將offset保存在Zookeeper中,從0.9版本開始,consumer默認將offset保存在Kafka一個內置的topic中,該topic為__consumer_offsets

Kafka 高效讀寫數據

1)順序寫磁盤

Kafka的producer生產數據,要寫入到log文件中,寫的過程是一直追加到文件末端,為順序寫。官網有數據表明,同樣的磁盤,順序寫能到到600M/s,而隨機寫只有100k/s。這與磁盤的機械機構有關,順序寫之所以快,是因為其省去了大量磁頭尋址的時間。

2)零複製技術

kafka中的消費者在讀取服務端的數據時,需要將服務端的磁盤文件通過網絡發送到消費者進程,網絡發送需要經過幾種網絡節點。如下圖所示:

Kafka知識整理

傳統的讀取文件數據併發送到網絡的步驟如下:
(1)操作系統將數據從磁盤文件中讀取到內核空間的頁面緩存;
(2)應用程序將數據從內核空間讀入用戶空間緩衝區;
(3)應用程序將讀到數據寫回內核空間並放入socket緩衝區;


(4)操作系統將數據從socket緩衝區複製到網卡接口,此時數據才能通過網絡發送。


通常情況下,Kafka的消息會有多個訂閱者,生產者發佈的消息會被不同的消費者多次消費,為了優化這個流程,Kafka使用了“零拷貝技術”,如下圖所示:

Kafka知識整理

“零拷貝技術”只用將磁盤文件的數據複製到頁面緩存中一次,然後將數據從頁面緩存直接發送到網絡中(發送給不同的訂閱者時,都可以使用同一個頁面緩存),避免了重複複製操作。

如果有10個消費者,傳統方式下,數據複製次數為4*10=40次,而使用“零拷貝技術”只需要1+10=11次,一次為從磁盤複製到頁面緩存,10次表示10個消費者各自讀取一次頁面緩存。

Zookeeper在Kafka中的作用

Kafka集群中有一個broker會被選舉為Controller,負責管理集群broker的上下線,所有topic的分區副本分配和leader選舉等工作。

Controller的管理工作都是依賴於Zookeeper的。

以下為partition的leader選舉過程:

Kafka知識整理

Kafka API

Producer API

消息發送流程

Kafka的Producer發送消息採用的是異步發送的方式。在消息發送的過程中,涉及到了兩個線程——main線程和Sender線程,以及一個線程共享變量——RecordAccumulator。main線程將消息發送給RecordAccumulator,Sender線程不斷從RecordAccumulator中拉取消息發送到Kafka broker。

Kafka知識整理

相關參數:

batch.size:只有數據積累到batch.size之後,sender才會發送數據。

linger.ms:如果數據遲遲未達到batch.size,sender等待linger.time之後就會發送數據。

異步發送API

1)導入依賴

<code>
org.apache.kafka
kafka-clients
0.11.0.0
/<code>

2)編寫代碼

需要用到的類:

KafkaProducer:需要創建一個生產者對象,用來發送數據

ProducerConfig:獲取所需的一系列配置參數

ProducerRecord:每條數據都要封裝成一個ProducerRecord對象

1.不帶回調函數的API

<code>import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重試次數
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待時間
        props.put("buffer.memory", 33554432);//RecordAccumulator緩衝區大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord("first", Integer.toString(i), Integer.toString(i)));
        }
        producer.close();
    }
}/<code>

2.帶回調函數的API

回調函數會在producer收到ack時調用,為異步調用,該方法有兩個參數,分別是RecordMetadata和Exception,如果Exception為null,說明消息發送成功,如果Exception不為null,說明消息發送失敗。

注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。

<code>import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重試次數
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待時間
        props.put("buffer.memory", 33554432);//RecordAccumulator緩衝區大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord("first", Integer.toString(i), Integer.toString(i)), new Callback() {

                //回調函數,該方法會在Producer收到ack時調用,為異步調用
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("success->" + metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
        }
        producer.close();
    }
}/<code>

同步發送API

同步發送的意思就是,一條消息發送之後,會阻塞當前線程,直至返回ack。

由於send方法返回的是一個Future對象,根據Futrue對象的特點,我們也可以實現同步發送的效果,只需在調用Future對象的get方法即可。

<code>import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");//kafka集群,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重試次數
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待時間
        props.put("buffer.memory", 33554432);//RecordAccumulator緩衝區大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord("first", Integer.toString(i), Integer.toString(i))).get();
        }
        producer.close();
    }
}
/<code>

Consumer API

Consumer消費數據時的可靠性是很容易保證的,因為數據在Kafka中是持久化的,故不用擔心數據丟失問題。

由於consumer在消費過程中可能會出現斷電宕機等故障,consumer恢復後,需要從故障前的位置的繼續消費,所以consumer需要實時記錄自己消費到了哪個offset,以便故障恢復後繼續消費。

所以offset的維護是Consumer消費數據是必須考慮的問題。

手動提交offset

1)導入依賴

<code>
   org.apache.kafka
   kafka-clients
   0.11.0.0
/<code>

2)編寫代碼

需要用到的類:

KafkaConsumer:需要創建一個消費者對象,用來消費數據

ConsumerConfig:獲取所需的一系列配置參數

ConsuemrRecord:每條數據都要封裝成一個ConsumerRecord對象

<code>import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class CustomConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");
        props.put("group.id", "test");//消費者組,只要group.id相同,就屬於同一個消費者組
        props.put("enable.auto.commit", "false");//自動提交offset
       
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("first"));
        while (true) {
            ConsumerRecords records = consumer.poll(100);
            for (ConsumerRecord record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            consumer.commitSync();
        }
    }
}
/<code>

3)代碼分析:

手動提交offset的方法有兩種:分別是commitSync(同步提交)和commitAsync(異步提交)。兩者的相同點是,都會將本次poll的一批數據最高的偏移量提交;不同點是,commitSync會失敗重試,一直到提交成功(如果由於不可恢復原因導致,也會提交失敗);而commitAsync則沒有失敗重試機制,故有可能提交失敗。

4)數據重複消費問題

Kafka知識整理

自動提交offset

為了使我們能夠專注於自己的業務邏輯,Kafka提供了自動提交offset的功能。

自動提交offset的相關參數:

enable.auto.commit:是否開啟自動提交offset功能

auto.commit.interval.ms:自動提交offset的時間間隔

以下為自動提交offset的代碼:

<code>import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class CustomConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("first"));
        while (true) {
            ConsumerRecords records = consumer.poll(100);
            for (ConsumerRecord record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}/<code>

自定義Interceptor

攔截器原理

Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用於實現clients端的定製化控制邏輯。

對於producer而言,interceptor使得用戶在消息發送前以及producer回調邏輯前有機會對消息做一些定製化需求,比如修改消息等。同時,producer允許用戶指定多個interceptor按序作用於同一條消息從而形成一個攔截鏈(interceptor chain)。Intercetpor的實現接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:

(1)configure(configs)

獲取配置信息和初始化數據時調用。

(2)onSend(ProducerRecord):

該方法封裝進KafkaProducer.send方法中,即它運行在用戶主線程中。Producer確保在消息被序列化以及計算分區前調用該方法。用戶可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的topic和分區,否則會影響目標分區的計算。

(3)onAcknowledgement(RecordMetadata, Exception):

該方法會在消息從RecordAccumulator成功發送到Kafka Broker之後,或者在發送過程中失敗時調用。並且通常都是在producer回調邏輯觸發之前。onAcknowledgement運行在producer的IO線程中,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的消息發送效率。

(4)close:

關閉interceptor,主要用於執行一些資源清理工作

如前所述,interceptor可能被運行在多個線程中,因此在具體實現時用戶需要自行確保線程安全。另外倘若指定了多個interceptor,則producer將按照指定順序調用它們,並僅僅是捕獲每個interceptor可能拋出的異常記錄到錯誤日誌中而非在向上傳遞。這在使用過程中要特別留意。

攔截器案例

1)需求:

實現一個簡單的雙interceptor組成的攔截鏈。第一個interceptor會在消息發送前將時間戳信息加到消息value的最前部;第二個interceptor會在消息發送後更新成功發送消息數或失敗發送消息數。

Kafka知識整理

2)案例實操

(1)增加時間戳攔截器

<code>import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TimeInterceptor implements ProducerInterceptor {

	@Override
	public void configure(Map configs) {

	}

	@Override
	public ProducerRecord onSend(ProducerRecord record) {
		// 創建一個新的record,把時間戳寫入消息體的最前部
		return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
				System.currentTimeMillis() + "," + record.value().toString());
	}

	@Override
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

	}

	@Override
	public void close() {

	}
}/<code>

(2)統計發送消息成功和發送失敗消息數,並在producer關閉時打印這兩個計數器

<code>import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CounterInterceptor implements ProducerInterceptor{
    private int errorCounter = 0;
    private int successCounter = 0;

	@Override
	public void configure(Map configs) {
		
	}

	@Override
	public ProducerRecord onSend(ProducerRecord record) {
		 return record;
	}

	@Override
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
		// 統計成功和失敗的次數
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
	}

	@Override
	public void close() {
        // 保存結果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
	}
}/<code>

(3)producer主程序

<code>import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

public class InterceptorProducer {

	public static void main(String[] args) throws Exception {
		// 1 設置配置信息
		Properties props = new Properties();
		props.put("bootstrap.servers", "hadoop102:9092");
		props.put("acks", "all");
		props.put("retries", 0);
		props.put("batch.size", 16384);
		props.put("linger.ms", 1);
		props.put("buffer.memory", 33554432);
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		
		// 2 構建攔截鏈
		List interceptors = new ArrayList<>();
		interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor"); 	interceptors.add("com.atguigu.kafka.interceptor.CounterInterceptor"); 
		props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
		 
		String topic = "first";
		Producer producer = new KafkaProducer<>(props);
		
		// 3 發送消息
		for (int i = 0; i < 10; i++) {
			
		    ProducerRecord record = new ProducerRecord<>(topic, "message" + i);
		    producer.send(record);
		}
		 
		// 4 一定要關閉producer,這樣才會調用interceptor的close方法
		producer.close();
	}
}/<code>

3)測試

(1)在kafka上啟動消費者,然後運行客戶端java程序。

<code>bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic first

1501904047034,message0
1501904047225,message1
1501904047230,message2
1501904047234,message3
1501904047236,message4
1501904047240,message5
1501904047243,message6
1501904047246,message7
1501904047249,message8
1501904047252,message9/<code>

Flume對接Kafka

1)配置flume(flume-kafka.conf)

<code># define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log
a1.sources.r1.shell = /bin/bash -c

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1/<code>

2) 啟動kafkaIDEA消費者

3) 進入flume根目錄下,啟動flume

<code>$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf/<code>

4) 向 /opt/module/datas/flume.log裡追加數據,查看kafka消費者消費情況

<code>$ echo hello >> /opt/module/datas/flume.log/<code>

Kafka監控

Kafka Monitor

1.上傳jar包KafkaOffsetMonitor-assembly-0.4.6.jar到集群

2.在/opt/module/下創建kafka-offset-console文件夾

3.將上傳的jar包放入剛創建的目錄下

4.在/opt/module/kafka-offset-console目錄下創建啟動腳本start.sh,內容如下:

<code>#!/bin/bash
java -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka \
--kafkaBrokers hadoop102:9092,hadoop103:9092,hadoop104:9092 \
--kafkaSecurityProtocol PLAINTEXT \
--zk hadoop102:2181,hadoop103:2181,hadoop104:2181 \
--port 8086 \
--refresh 10.seconds \
--retain 2.days \
--dbName offsetapp_kafka &/<code>

5.在/opt/module/kafka-offset-console目錄下創建mobile-logs文件夾

<code>mkdir /opt/module/kafka-offset-console/mobile-logs/<code>

6.啟動KafkaMonitor

<code>./start.sh/<code>

7.登錄頁面hadoop102:8086端口查看詳情

Kafka Manager

1.上傳壓縮包kafka-manager-1.3.3.15.zip到集群

2.解壓到/opt/module

3.修改配置文件conf/application.conf

<code>kafka-manager.zkhosts="kafka-manager-zookeeper:2181"/<code>

修改為:

<code>kafka-manager.zkhosts="hadoop102:2181,hadoop103:2181,hadoop104:2181"/<code>

4.啟動kafka-manager

<code>bin/kafka-manager/<code>

5.登錄hadoop102:9000頁面查看詳細信息

Kafka Streams

Kafka Streams。Apache Kafka開源項目的一個組成部分。是一個功能強大,易於使用的庫。用於在Kafka上構建高可分佈式、拓展性,容錯的應用程序。

為什麼要有Kafka Stream

當前已經有非常多的流式處理系統,最知名且應用最多的開源流式處理系統有Spark Streaming和Apache Storm。Apache Storm發展多年,應用廣泛,提供記錄級別的處理能力,當前也支持SQL on Stream。而Spark Streaming基於Apache Spark,可以非常方便與圖計算,SQL處理等集成,功能強大,對於熟悉其它Spark應用開發的用戶而言使用門檻低。另外,目前主流的Hadoop發行版,如Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。

既然Apache Spark與Apache Storm擁有如此多的優勢,那為何還需要Kafka Stream呢?主要有如下原因。

第一,Spark和Storm都是流式處理框架,而Kafka Stream提供的是一個基於Kafka的流式處理類庫。框架要求開發者按照特定的方式去開發邏輯部分,供框架調用。開發者很難了解框架的具體運行方式,從而使得調試成本高,並且使用受限。而Kafka Stream作為流式處理類庫,直接提供具體的類給開發者調用,整個應用的運行方式主要由開發者控制,方便使用和調試。

Kafka知識整理

第二,雖然Cloudera與Hortonworks方便了Storm和Spark的部署,但是這些框架的部署仍然相對複雜。而Kafka Stream作為類庫,可以非常方便的嵌入應用程序中,它對應用的打包和部署基本沒有任何要求。

第三,就流式處理系統而言,基本都支持Kafka作為數據源。例如Storm具有專門的kafka-spout,而Spark也提供專門的spark-streaming-kafka模塊。事實上,Kafka基本上是主流的流式處理系統的標準數據源。換言之,大部分流式系統中都已部署了Kafka,此時使用Kafka Stream的成本非常低。

第四,使用Storm或Spark Streaming時,需要為框架本身的進程預留資源,如Storm的supervisor和Spark on YARN的node manager。即使對於應用實例而言,框架本身也會佔用部分資源,如Spark Streaming需要為shuffle和storage預留內存。但是Kafka作為類庫不佔用系統資源。

第五,由於Kafka本身提供數據持久化,因此Kafka Stream提供滾動部署和滾動升級以及重新計算的能力。

第六,由於Kafka Consumer Rebalance機制,Kafka Stream可以在線動態調整並行度。

Kafka Stream數據清洗案例

0)需求:

實時處理單詞帶有”>>>”前綴的內容。例如輸入”atguigu>>>ximenqing”,最終處理成“ximenqing”

1)需求分析:

Kafka知識整理

2)案例實操

(1)創建一個工程,並添加jar包

(2)創建主類

<code>import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;

public class Application {

	public static void main(String[] args) {

		// 定義輸入的topic
        String from = "first";
        // 定義輸出的topic
        String to = "second";

        // 設置參數
        Properties settings = new Properties();
        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");

        StreamsConfig config = new StreamsConfig(settings);

        // 構建拓撲
        TopologyBuilder builder = new TopologyBuilder();

        builder.addSource("SOURCE", from)
               .addProcessor("PROCESS", new ProcessorSupplier() {

					@Override
					public Processor get() {
						// 具體分析處理
						return new LogProcessor();
					}
				}, "SOURCE")
                .addSink("SINK", to, "PROCESS");

        // 創建kafka stream
        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
	}
}
(3)具體業務處理
package com.atguigu.kafka.stream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;

public class LogProcessor implements Processor {
	
	private ProcessorContext context;
	
	@Override
	public void init(ProcessorContext context) {
		this.context = context;
	}

	@Override
	public void process(byte[] key, byte[] value) {
		String input = new String(value);
		
		// 如果包含“>>>”則只保留該標記後面的內容
		if (input.contains(">>>")) {
			input = input.split(">>>")[1].trim();
			// 輸出到下一個topic
			context.forward("logProcessor".getBytes(), input.getBytes());
		}else{
			context.forward("logProcessor".getBytes(), input.getBytes());
		}
	}

	@Override
	public void punctuate(long timestamp) {
		
	}

	@Override
	public void close() {
		
	}
}/<code>

(4)運行程序

(5)在hadoop104上啟動生產者

<code>bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic first

>hello>>>world
/<code>

(6)在hadoop103上啟動消費者

<code>bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --from-beginning --topic second

world
hahaha/<code>

Kafka配置信息

Broker配置信息

broker.id:必填參數,broker的唯一標識

log.dirs:默認值:/tmp/kafka-logs。Kafka數據存放的目錄。可以指定多個目錄,中間用逗號分隔,當新partition被創建的時會被存放到當前存放partition最少的目錄。

port:默認值:9092。BrokerServer接受客戶端連接的端口號。

zookeeper.connect:Zookeeper的連接串,格式為:hostname1:port1,hostname2:port2,hostname3:port3。可以填一個或多個,為了提高可靠性,建議都填上。注意,此配置允許我們指定一個zookeeper路徑來存放此kafka集群的所有數據,為了與其他應用集群區分開,建議在此配置中指定本集群存放目錄,格式為:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。需要注意的是,消費者的參數要和此參數一致。

message.max.bytes:默認值:1000000。服務器可以接收到的最大的消息大小。注意此參數要和consumer的maximum.message.size大小一致,否則會因為生產者生產的消息太大導致消費者無法消費。

num.io.threads:默認值:8。服務器用來執行讀寫請求的IO線程數,此參數的數量至少要等於服務器上磁盤的數量。

queued.max.requests:默認值:500。I/O線程可以處理請求的隊列大小,若實際請求數超過此大小,網絡線程將停止接收新的請求。

socket.send.buffer.bytes

:默認值:100 * 1024。The SO_SNDBUFF buffer the server prefers for socket connections。

socket.request.max.bytes:默認值:100 * 1024 * 1024。服務器允許請求的最大值, 用來防止內存溢出,其值應該小於 Java heap size。

num.partitions:默認值:1。默認partition數量,如果topic在創建時沒有指定partition數量,默認使用此值,建議改為5。

log.segment.bytes:1024 * 1024 *1024。Segment文件的大小,超過此值將會自動新建一個segment,此值可以被topic級別的參數覆蓋。

log.roll.{ms,hours}:默認值:24 * 7 hours。新建segment文件的時間,此值可以被topic級別的參數覆蓋。

log.retention.{ms,minutes,hours}:默認值:7 days。Kafka segment log的保存週期,保存週期超過此時間日誌就會被刪除。此參數可以被topic級別參數覆蓋。數據量大時,建議減小此值。

log.retention.bytes:默認值:-1。每個partition的最大容量,若數據量超過此值,partition數據將會被刪除。注意這個參數控制的是每個partition而不是topic。此參數可以被log級別參數覆蓋。

log.retention.check.interval.ms:默認值:5 minutes。刪除策略的檢查週期。

auto.create.topics.enable:默認值:5 minutes。自動創建topic參數,建議此值設置為false,嚴格控制topic管理,防止生產者錯寫topic。

default.replication.factor:默認值:1。默認副本數量,建議改為2。

replica.lag.time.max.ms:默認值:10000。在此窗口時間內沒有收到follower的fetch請求,leader會將其從ISR(in-sync replicas)中移除。

replica.lag.max.messages:默認值:4000。如果replica節點落後leader節點此值大小的消息數量,leader節點就會將其從ISR中移除。

replica.socket.timeout.ms:默認值:30 * 1000。replica向leader發送請求的超時時間。

zookeeper.session.timeout.ms:默認值:6000。ZooKeeper session 超時時間。如果在此時間內server沒有向zookeeper發送心跳,zookeeper就會認為此節點已掛掉。 此值太低導致節點容易被標記死亡;若太高,.會導致太遲發現節點死亡。

zookeeper.connection.timeout.ms:默認值:6000。客戶端連接zookeeper的超時時間。

zookeeper.sync.time.ms:默認值:2000。H ZK follower落後 ZK leader的時間。

controlled.shutdown.enable:默認值:true。允許broker shutdown。如果啟用,broker在關閉自己之前會把它上面的所有leaders轉移到其它brokers上,建議啟用,增加集群穩定性。

delete.topic.enable:默認值:false。啟用deletetopic參數,建議設置為true。

Producer配置信息

metadata.broker.list:啟動時producer查詢brokers的列表,可以是集群中所有brokers的一個子集。注意,這個參數只是用來獲取topic的元信息用,producer會從元信息中挑選合適的broker並與之建立socket連接。格式是:host1:port1,host2:port2。

request.required.acks:默認值:0。

request.timeout.ms:默認值:10000。Broker等待ack的超時時間,若等待時間超過此值,會返回客戶端錯誤信息。

producer.type:默認值:sync。同步異步模式。async表示異步,sync表示同步。如果設置成異步模式,可以允許生產者以batch的形式push數據,這樣會極大的提高broker性能,推薦設置為異步。

serializer.class:kafka.serializer.DefaultEncoder,序列號類,.默認序列化成 byte[] 。

key.serializer.class:Key的序列化類,默認同上。

partitioner.class:kafka.producer.DefaultPartitioner,Partition類,默認對key進行hash。

compression.codec:默認值:none。指定producer消息的壓縮格式,可選參數為: “none”, “gzip” and “snappy”。

compressed.topics:啟用壓縮的topic名稱。若上面參數選擇了一個壓縮格式,那麼壓縮僅對本參數指定的topic有效,若本參數為空,則對所有topic有效。

message.send.max.retries:默認值3。Producer發送失敗時重試次數。若網絡出現問題,可能會導致不斷重試。

queue.buffering.max.ms:默認值5000。啟用異步模式時,producer緩存消息的時間。比如我們設置成1000時,它會緩存1秒的數據再一次發送出去,這樣可以極大的增加broker吞吐量,但也會造成時效性的降低。

queue.buffering.max.messages:默認值:10000。採用異步模式時producer buffer 隊列裡最大緩存的消息數量,如果超過這個數值,producer就會阻塞或者丟掉消息。

queue.enqueue.timeout.ms:默認值:-1。當達到上面參數值時producer阻塞等待的時間。如果值設置為0,buffer隊列滿時producer不會阻塞,消息直接被丟掉。若值設置為-1,producer會被阻塞,不會丟消息。

batch.num.messages:默認值:200。採用異步模式時,一個batch緩存的消息數量。達到這個數量值時producer才會發送消息。

Consumer配置信息

group.id:Consumer的組ID,相同goup.id的consumer屬於同一個組。

zookeeper.connect:Consumer的zookeeper連接串,要和broker的配置一致。

consumer.id:如果不設置會自動生成。

socket.timeout.ms:默認值:30 * 1000。網絡請求的socket超時時間。實際超時時間由max.fetch.wait + socket.timeout.ms 確定。

socket.receive.buffer.bytes:默認值:64 * 1024。The socket receive buffer for network requests。

fetch.message.max.bytes:默認值:1024 * 1024。查詢topic-partition時允許的最大消息大小。consumer會為每個partition緩存此大小的消息到內存,因此,這個參數可以控制consumer的內存使用量。這個值應該至少比server允許的最大消息大小大,以免producer發送的消息大於consumer允許的消息。

num.consumer.fetchers:默認值:1。The number fetcher threads used to fetch data。

auto.commit.enable:默認值:true。如果此值設置為true,consumer會週期性的把當前消費的offset值保存到zookeeper。當consumer失敗重啟之後將會使用此值作為新開始消費的值。

auto.commit.interval.ms

:默認值:60 * 1000。Consumer提交offset值到zookeeper的週期。

queued.max.message.chunks:默認值:2。用來被consumer消費的message chunks 數量, 每個chunk可以緩存fetch.message.max.bytes大小的數據量。

auto.commit.interval.ms:默認值:60 * 1000。Consumer提交offset值到zookeeper的週期。

queued.max.message.chunks:默認值:2。用來被consumer消費的message chunks 數量, 每個chunk可以緩存fetch.message.max.bytes大小的數據量。


分享到:


相關文章: