kafka性能調優

kafka是一個高吞吐量分佈式消息系統,並且提供了持久化。其高性能的有兩個重要特點:

  • 利用了磁盤連續讀寫性能遠遠高於隨機讀寫的特點;
  • 併發,將一個topic拆分多個partition。

要充分發揮kafka的性能,就需要滿足這兩個條件

kafka讀寫的單位是partition,因此,將一個topic拆分為多個partition可以提高吞吐量。但是,這裡有個前提,就是不同partition需 要位於不同的磁盤(可以在同一個機器)。如果多個partition位於同一個磁盤,那麼意味著有多個進程同時對一個磁盤的多個文 件進行讀寫,使得操作系統會對磁盤讀寫進行頻繁調度,也就是破壞了磁盤讀寫的連續性。

在linkedlin的測試中,每臺機器就加載了6個磁盤,並且不做raid,就是為了充分利用多磁盤併發讀寫,又保證每個磁盤連續讀寫 的特性。

具體配置上,是將不同磁盤的多個目錄配置到broker的log.dirs,例如

log.dirs=/disk1/kafka-logs,/disk2/kafka-logs,/disk3/kafka-logs

kafka會在新建partition的時候,將新partition分佈在partition最少的目錄上,因此,一般不能將同一個磁盤的多個目錄設置到log.dirs

同一個ConsumerGroup內的Consumer和Partition在同一時間內必須保證是一對一的消費關係

任意Partition在某一個時刻只能被一個Consumer Group內的一個Consumer消費(反過來一個Consumer則可以同時消費多個Partition)

JVM參數配置

推薦使用最新的G1來代替CMS作為垃圾回收器。

推薦使用的最低版本為JDK 1.7u51。下面是本次試驗中Broker的JVM內存配置參數:

-Xms30g -Xmx30g -XX:PermSize=48m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35

  • 1

G1相比較於CMS的優勢:

  • G1是一種適用於服務器端的垃圾回收器,很好的平衡了吞吐量和響應能力
  • 對於內存的劃分方法不同,Eden, Survivor, Old區域不再固定,使用內存會更高效。G1通過對內存進行Region的劃分,有效避免了內存碎片問題。
  • G1可以指定GC時可用於暫停線程的時間(不保證嚴格遵守)。而CMS並不提供可控選項。
  • CMS只有在FullGC之後會重新合併壓縮內存,而G1把回收和合並集合在一起。
  • CMS只能使用在Old區,在清理Young時一般是配合使用ParNew,而G1可以統一兩類分區的回收算法。

G1的適用場景:

  • JVM佔用內存較大(At least 4G)
  • 應用本身頻繁申請、釋放內存,進而產生大量內存碎片時。
  • 對於GC時間較為敏感的應用。

JVM參數詳解:http://blog.csdn.net/lizhitao/article/details/44677659

Broker參數配置

配置優化都是修改server.properties文件中參數值

1. 網絡和io操作線程配置優化

# broker處理消息的最大線程數

num.network.threads=xxx

# broker處理磁盤IO的線程數

num.io.threads=xxx

<strong>建議配置:

用於接收並處理網絡請求的線程數,默認為3。其內部實現是採用Selector模型。啟動一個線程作為Acceptor來負責建立連接,再配合啟動num.network.threads個線程來輪流負責從Sockets裡讀取請求,一般無需改動,除非上下游併發請求量過大。一般num.network.threads主要處理網絡io,讀寫緩衝區數據,基本沒有io等待,配置線程數量為cpu核數加1.

num.io.threads主要進行磁盤io操作,高峰期可能有些io等待,因此配置需要大些。配置線程數量為cpu核數2倍,最大不超過3倍.

2. log數據文件刷盤策略

為了大幅度提高producer寫入吞吐量,需要定期批量寫文件。

建議配置:

# 每當producer寫入10000條消息時,刷數據到磁盤 log.flush.interval.messages=10000

# 每間隔1秒鐘時間,刷數據到磁盤

log.flush.interval.ms=1000


3. 日誌保留策略配置

當kafka server的被寫入海量消息後,會生成很多數據文件,且佔用大量磁盤空間,如果不及時清理,可能磁盤空間不夠用,kafka默認是保留7天。

建議配置:

# 保留三天,也可以更短

log.retention.hours=72

# 段文件配置1GB,有利於快速回收磁盤空間,重啟kafka加載也會加快(如果文件過小,則文件數量比較多,

# kafka啟動時是單線程掃描目錄(log.dir)下所有數據文件)

log.segment.bytes=1073741824


Tips

  • Kafka官方並不建議通過Broker端的log.flush.interval.messages和log.flush.interval.ms來強制寫盤,認為數據的可靠性應該通過Replica來保證,而強制Flush數據到磁盤會對整體性能產生影響。
  • 可以通過調整/proc/sys/vm/dirty_background_ratio和/proc/sys/vm/dirty_ratio來調優性能。
  1. 髒頁率超過第一個指標會啟動pdflush開始Flush Dirty PageCache。
  2. 髒頁率超過第二個指標會阻塞所有的寫操作來進行Flush。
  3. 根據不同的業務需求可以適當的降低dirty_background_ratio和提高dirty_ratio。

如果topic的數據量較小可以考慮減少log.flush.interval.ms和log.flush.interval.messages來強制刷寫數據,減少可能由於緩存數據未寫盤帶來的不一致。

4. 配置jmx服務

kafka server中默認是不啟動jmx端口的,需要用戶自己配置

[lizhitao@root kafka_2.10-0.8.1]$ vim bin/kafka-run-class.sh

#最前面添加一行

JMX_PORT=8060


5. Replica相關配置:

replica.lag.time.max.ms:10000

replica.lag.max.messages:4000

num.replica.fetchers:1

#在Replica上會啟動若干Fetch線程把對應的數據同步到本地,而num.replica.fetchers這個參數是用來控制Fetch線程的數量。

#每個Partition啟動的多個Fetcher,通過共享offset既保證了同一時間內Consumer和Partition之間的一對一關係,又允許我們通過增多Fetch線程來提高效率。

default.replication.factor:1

#這個參數指新創建一個topic時,默認的Replica數量

#Replica過少會影響數據的可用性,太多則會白白浪費存儲資源,一般建議在2~3為宜。


6. purgatory

fetch.purgatory.purge.interval.requests:1000

producer.purgatory.purge.interval.requests:1000

  • 首先來介紹一下這個“煉獄”究竟是用來做什麼用的。Broker的一項主要工作就是接收並處理網絡上發來的Request。這些Request其中有一些是可以立即答覆的,那很自然這些Request會被直接回復。另外還有一部分是沒辦法或者Request自發的要求延時答覆(例如發送和接收的Batch),Broker會把這種Request放入Paurgatory當中,同時每一個加入Purgatory當中的Request還會額外的加入到兩個監控對隊列:
  • WatcherFor隊列:用於檢查Request是否被滿足。
  • DelayedQueue隊列:用於檢測Request是否超時。

Request最終的狀態只有一個,就是Complete。請求被滿足和超時最終都會被統一的認為是Complete。

目前版本的Purgatory設計上是存在一定缺陷的。Request狀態轉變為Complete後,並沒能立即從Purgatory中移除,而是繼續佔用資源,因此佔用內存累積最終會引發OOM。這種情況一般只會在topic流量較少的情況下觸發。更詳細的資料可以查閱擴展閱讀,在此不做展開。

在實際使用中我也是踩了這個坑過來的,當時的情況是集群新上了一個topic,初期該topic數據很少(Low volume topic),導致那段時間在凌晨3,4點左右會隨機有Broker因為OOM掛掉。定位原因後把*.purgatory.purge.interval.requests的配置調整小至100就解決了這個問題。

Kafka的研發團隊已經開始著手重新設計Purgatory,力求能夠讓Request在Complete時立即從Purgatory中移除。

  1. 其他

num.partitions:1

#分區數量

queued.max.requests:500

#這個參數是指定用於緩存網絡請求的隊列的最大容量,這個隊列達到上限之後將不再接收新請求。一般不會成為瓶頸點,除非I/O性能太差,這時需要配合num.io.threads等配置一同進行調整。

compression.codec:none

#Message落地時是否採用以及採用何種壓縮算法。一般都是把Producer發過來Message直接保存,不再改變壓縮方式。

in.insync.replicas:1

#這個參數只能在topic層級配置,指定每次Producer寫操作至少要保證有多少個在ISR的Replica確認,一般配合request.required.acks使用。要注意,這個參數如果設置的過高可能會大幅降低吞吐量。


Producer優化

buffer.memory:33554432 (32m)

#在Producer端用來存放尚未發送出去的Message的緩衝區大小。緩衝區滿了之後可以選擇阻塞發送或拋出異常,由block.on.buffer.full的配置來決定。

compression.type:none

#默認發送不進行壓縮,推薦配置一種適合的壓縮算法,可以大幅度的減緩網絡壓力和Broker的存儲壓力。

linger.ms:0

#Producer默認會把兩次發送時間間隔內收集到的所有Requests進行一次聚合然後再發送,以此提高吞吐量,而linger.ms則更進一步,這個參數為每次發送增加一些delay,以此來聚合更多的Message。

batch.size:16384

#Producer會嘗試去把發往同一個Partition的多個Requests進行合併,batch.size指明瞭一次Batch合併後Requests總大小的上限。如果這個值設置的太小,可能會導致所有的Request都不進行Batch。

acks:1

#這個配置可以設定發送消息後是否需要Broker端返回確認。

0: 不需要進行確認,速度最快。存在丟失數據的風險。

1: 僅需要Leader進行確認,不需要ISR進行確認。是一種效率和安全折中的方式。

all: 需要ISR中所有的Replica給予接收確認,速度最慢,安全性最高,但是由於ISR可能會縮小到僅包含一個Replica,所以設置參數為all並不能一定避免數據丟失。


Consumer優化

num.consumer.fetchers:1

#啟動Consumer的個數,適當增加可以提高併發度。

fetch.min.bytes:1

#每次Fetch Request至少要拿到多少字節的數據才可以返回。

#在Fetch Request獲取的數據至少達到fetch.min.bytes之前,允許等待的最大時長。對應上面說到的Purgatory中請求的超時時間。

fetch.wait.max.ms:100


  • 通過Consumer Group,可以支持生產者消費者和隊列訪問兩種模式。
  • Consumer API分為High level和Low level兩種。前一種重度依賴Zookeeper,所以性能差一些且不自由,但是超省心。第二種不依賴Zookeeper服務,無論從自由度和性能上都有更好的表現,但是所有的異常(Leader遷移、Offset越界、Broker宕機等)和Offset的維護都需要自行處理。
  • 大家可以關注下不日發佈的0.9 Release。開發人員又用Java重寫了一套Consumer。把兩套API合併在一起,同時去掉了對Zookeeper的依賴。據說性能有大幅度提升哦~~


分享到:


相關文章: