0748-5.14.4-Kafka的擴容和縮容

​文檔編寫目的

在Kafka集群資源使用已超出系統配置的資源時,或者有大量資源閒置造成資源浪費的時候,需要分別通過擴容Kafka和縮容Kafka來進行調整。本篇文章Fayson主要介紹如何進行Kafka的擴容和縮容,以及變更後的Kafka集群如何進行負載均衡的操作。

  • 測試環境:

1.Redhat7.2

2.採用root用戶操作

3.CM為5.16.2,CDH為5.14.4

4.Kafka版本為0.10.2

5.集群啟用了Kerberos,Kafka未啟用Kerberos和Sentry


Kafka集群的擴容

2.1 當前Kafka集群狀態


集群中有3個kafka broker

0748-5.14.4-Kafka的擴容和縮容


有2個topic,分別為test和test1,情況如下

0748-5.14.4-Kafka的擴容和縮容


0748-5.14.4-Kafka的擴容和縮容


2.2 擴容前準備

新擴容的機器要先加入集群中,通過CM管理,按照下面的步驟進行操作

1.修改新添加的機器的hostname

<code>[root@hadoop6 ~]# hostnamectl set-hostname cdh04.hadoop.com/<code>
0748-5.14.4-Kafka的擴容和縮容


2.修改/etc/hosts文件並同步到所有節點,這裡用腳本來實現。

<code>192.168.0.204 cdh01.hadoop.com cdh01192.168.0.205 cdh02.hadoop.com cdh02192.168.0.206 cdh03.hadoop.com cdh03192.168.0.195 cdh04.hadoop.com cdh04/<code>
0748-5.14.4-Kafka的擴容和縮容

0748-5.14.4-Kafka的擴容和縮容


3.新添加的節點關閉防火牆,設置開機自動關閉

<code>[root@cdh04 ~]# systemctl stop firewalld[root@cdh04 ~]# systemctl disable firewalld/<code>
0748-5.14.4-Kafka的擴容和縮容


4.新添加的節點禁用SELinux,並修改修改/etc/selinux/config

<code>[root@cdh04 ~]# setenforce 0setenforce: SELinux is disabled[root@cdh04 ~]# vim /etc/selinux/config/<code>
0748-5.14.4-Kafka的擴容和縮容


5.新添加的節點關閉透明大頁面,並且在/etc/rc.d/rc.local 裡面加入腳本,設置自動開機關閉。

<code>[root@cdh04 ~]# echo never > /sys/kernel/mm/transparent_hugepage/defrag[root@cdh04 ~]# echo never > /sys/kernel/mm/transparent_hugepage/enabled[root@cdh04 ~]# vim /etc/rc.d/rc.localif test -f /sys/kernel/mm/transparent_hugepage/defragthen echo never > /sys/kernel/mm/transparent_hugepage/defragfiif test -f /sys/kernel/mm/transparent_hugepage/enabledthen echo never > /sys/kernel/mm/transparent_hugepage/enabledfi/<code>
0748-5.14.4-Kafka的擴容和縮容


<code>[root@cdh04 ~]# chmod +x /etc/rc.d/rc.local/<code>


6.新添加的節點設置SWAP,並且設置開機自動更改

<code>[root@cdh04 ~]# sysctl vm.swappiness=1vm.swappiness = 1[root@cdh04 ~]# echo vm.swappiness = 1  >> /etc/sysctl.conf/<code>
0748-5.14.4-Kafka的擴容和縮容


7.新添加的節點配置時鐘同步,先在所有服務器卸載chrony,再安裝ntp,再修改配置把時鐘同步跟其他kafka broker節點保持一致

<code>[root@cdh04 ~]# yum -y remove chrony[root@cdh04 ~]# yum -y install ntp/<code>
0748-5.14.4-Kafka的擴容和縮容

<code>vim /etc/ntp.conf/<code>
0748-5.14.4-Kafka的擴容和縮容

啟動ntp服務,並設置開機啟動

<code>[root@cdh04 java]# systemctl start ntpd[root@cdh04 java]# systemctl enable ntpd/<code>


8.新添加的節點安裝kerberos客戶端,並進行配置

<code>[root@cdh04 ~]# yum -y install krb5-libs krb5-workstation/<code>
0748-5.14.4-Kafka的擴容和縮容

從其他節點拷貝/etc/krb5.conf文件到新節點

0748-5.14.4-Kafka的擴容和縮容


9.從其他節點拷貝集群使用的JDK到新節點/usr/java目錄下

<code>[root@cdh01 java]# scp -r jdk1.8.0_131/ cdh04:/usr/java/jdk1.8.0_131//<code>
0748-5.14.4-Kafka的擴容和縮容


10.在CM編輯主機模板kafka,只添加kafka broker角色

0748-5.14.4-Kafka的擴容和縮容


2.3 擴容Kafka

1.添加新的節點到集群,從CM主頁點擊Add Hosts,如下圖所示

0748-5.14.4-Kafka的擴容和縮容


2.點擊繼續

0748-5.14.4-Kafka的擴容和縮容


3.搜索主機,輸入主機名,點擊搜索並繼續

0748-5.14.4-Kafka的擴容和縮容

0748-5.14.4-Kafka的擴容和縮容


4.輸入自定義存儲庫地址,並繼續

0748-5.14.4-Kafka的擴容和縮容


5.點擊安裝JDK並繼續

0748-5.14.4-Kafka的擴容和縮容


6.輸入主機密碼並繼續

0748-5.14.4-Kafka的擴容和縮容


7.正在進行安裝,安裝成功後點擊繼續

0748-5.14.4-Kafka的擴容和縮容

0748-5.14.4-Kafka的擴容和縮容

0748-5.14.4-Kafka的擴容和縮容

激活完成,點擊繼續

0748-5.14.4-Kafka的擴容和縮容


8.點擊繼續,進入主機檢查,添加主機完成

0748-5.14.4-Kafka的擴容和縮容


9.應用主機模板kafka,擴容完成

0748-5.14.4-Kafka的擴容和縮容

0748-5.14.4-Kafka的擴容和縮容


擴容完成

0748-5.14.4-Kafka的擴容和縮容


擴容後平衡

在擴容完成後,可以通過自帶的命令來生成topic的平衡策略和執行平衡的操作。

3.1 生成平衡策略

1.查詢當前創建的topic

<code>[root@cdh01 ~]# kafka-topics --list --zookeeper cdh01.hadoop.com:2181/<code>
0748-5.14.4-Kafka的擴容和縮容


2.按照查詢到的topic來創建文件topics-to-move.json 格式如下

<code>{"topics": [{"topic": "test"},{"topic": "test1"}            ],"version":1}/<code>


0748-5.14.4-Kafka的擴容和縮容


3.在CM查詢kafka broker的ID

0748-5.14.4-Kafka的擴容和縮容


4.用命令生成遷移方案,下面畫紅框的就是生成的平衡方案

<code>kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --topics-to-move-json-file topics-to-move.json  --broker-list "121,124,125,126" --generate/<code>
0748-5.14.4-Kafka的擴容和縮容


5.把Proposed partition reassignment configuration下的內容複製保存為json文件newkafka.json,平衡方案生成完成。

0748-5.14.4-Kafka的擴容和縮容


3.2 執行平衡策略

1.執行下面的命令來進行平衡

<code>[root@cdh01 ~]# kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --reassignment-json-file newkafka.json --execute/<code>
0748-5.14.4-Kafka的擴容和縮容


2.用下面命令來進行查看執行的進度

<code>kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --reassignment-json-file newkafka.json --verify/<code>
0748-5.14.4-Kafka的擴容和縮容


從結果可以看到已經執行完成。,並且在新添加的節點上也可以看到有topic的副本存過來了。

0748-5.14.4-Kafka的擴容和縮容

縮容前的準備

1.編輯之前創建的topics-to-move.json文件,添加上系統自動生成的__consumer_offsets

<code>[root@cdh01 ~]# vim topics-to-move.json {"topics": [{"topic": "test"},{"topic": "__consumer_offsets"},{"topic": "test1"}            ],"version":1}/<code>
0748-5.14.4-Kafka的擴容和縮容


2.再次使用命令生成遷移計劃,這裡只選取121,124,126這三個broker,然後把生成計劃中的126替換成125進行保存,這樣就把126上的數據全部遷移到了125上。

<code>kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --topics-to-move-json-file topics-to-move.json  --broker-list "121,124,126" --generate/<code>
0748-5.14.4-Kafka的擴容和縮容

0748-5.14.4-Kafka的擴容和縮容


3.執行遷移命令,進行遷移

<code>kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --reassignment-json-file newkafka.json --execute/<code>
0748-5.14.4-Kafka的擴容和縮容


4.進行查詢,遷移完成

<code>kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --reassignment-json-file newkafka.json --verify/<code>
0748-5.14.4-Kafka的擴容和縮容


5.在要刪除的broker上也可以看到,topic數據已經遷移走

0748-5.14.4-Kafka的擴容和縮容


Kafka集群的縮容

在完成上訴縮容前的準備後,現在可以進行kafka集群的縮容。

1.從CM進入Kafka的實例界面

0748-5.14.4-Kafka的擴容和縮容


2.勾選要刪除的broker,先停止該broker

0748-5.14.4-Kafka的擴容和縮容


3.停止完成後,進行刪除

0748-5.14.4-Kafka的擴容和縮容


刪除完成。

0748-5.14.4-Kafka的擴容和縮容

總結

1.Kafka集群的擴容和縮容可以通過CM來進行添加broker和刪除broker來進行。


2.在Kafka集群擴容後,已有topic的partition不會自動均衡到新的磁盤上。可以通過kafka-reassign-partitions命令來進行數據平衡,先用命令生成平衡方案,再執行。也可以手動編輯遷移方案來進行執行。


3.新建topic的partition, 會以磁盤為單位,按照partition數量最少的來落盤。


4.在Kafka縮容前,需要把要刪除的broker上的topic數據遷出,也可以通過kafka-reassign-partitions來進行遷移,手動編輯遷移方案,再通過命令執行即可。


5.kafka-reassign-partitions這個命令,只有指定了broker id上的topic才會參與partition的reassign。根據我們的需求,可以手動來編寫和修改遷移計劃。


分享到:


相關文章: