Kafka實戰:集群SSL加密認證和配置(最新版kafka-2.4.0)

如果您覺得“大數據開發運維架構”對你有幫助,歡迎轉發朋友圈


這裡文章從公眾號轉過來 圖片效果不太好 有興趣請關注公眾號,或者去公眾號查看

一、概述:

Kafka集群的安裝配置請參考我的上一篇文章:Kafka入門:集群安裝部署(最新版kafka-2.4.0)。從Kafka0.9.0.0開始,為提高集群的安全性,Kafka社區增加了許多功能;Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三種認證機制。

目前支持以下安全措施:

1.clients 與 brokers 認證

2.brokers 與 zookeeper認證

3.數據傳輸加密 between brokers and clients, between brokers, or between brokers and tools using SSL

4.授權clients read/write

認證版本支持:

1.SASL/GSSAPI (Kerberos) - 從0.9.0.0開始支持

2.SASL/PLAIN - 從 0.10.0.0開始支持

3.SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 -從0.10.2.0開始支持

SSL相關知識:

1.JavaSSL認證

SSL(Secure Socket Layer安全套接層),及其繼任者傳輸層安全(Transport ;ayer Security,TLS)是為網絡通信提供安全及數據完整性的一種安全協議。TLS與SSL在傳輸層對網絡連接進行加密。

2.Kerberos認證 + ACL鑑權

Kerberos是一種網絡認證協議,其設計目標是通過密鑰系統為客戶機/服務器應用程序提供強大的認證服務。ACL則是在Kerberos的基礎上進行的鑑權措施,一般Kerberos認證就夠使用了。


二、SSL證書生成


Apache的Kafka允許client通過SSL連接。SSL默認情況下被禁止,但可以根據需要開啟:


您可以使用Java的keytool工具來完成,Keytool 是一個Java 數據證書的管理工具 ,Keytool 將密鑰(key)和證書(certificates)存在一個稱為keystore的文件中 在keystore裡,包含兩種數據:

1)..密鑰實體(Key entity)——密鑰(secret key)又或者是私鑰和配對公鑰(採用非對稱加密)

2).可信任的證書實體(trusted certificate entries)——只包含公鑰

keytool相關指令說明:


名稱說明

-alias

別名,可自定義,這裡叫kafka240

-keystore

指定密鑰庫的名稱(就像數據庫一樣的證書庫,可以有很多個證書,cacerts這個文件是jre自帶的, 也可以使用其它文件名字,如果沒有這個文件名字,它會創建這樣一個)

-storepass

指定密鑰庫的密碼-keypass指定別名條目的密碼-list顯示密鑰庫中的證書信息

-export

將別名指定的證書導出到文件-file參數指定導出到文件的文件名

-import

將已簽名數字證書導入密鑰庫-keypasswd修改密鑰庫中指定條目口令

-dname

指定證書擁有者信息。

其中,CN=名字與姓氏/域名,OU=組織單位名稱,O=組織名稱,L=城市或區域名稱,ST=州或省份名稱,C=單位的兩字母國家代碼

-keyalg

指定密鑰的算法-validity指定創建的證書有效期多少天-keysize指定密鑰長度


1.Kafka集群的每個broker節點生成SSL密鑰和證書(每個broker節執行)


每個節點執行一次後,集群中的每一臺機器都有一個公私密鑰對、一個標識該機器的證書,注意這裡是所有的broker節點都要執行這個命令。

<code>keytool -keystore server.keystore.jks -alias kafka240 -validity 365 -genkey/<code>

執行下面命令時,需要輸入密碼,自己記住就行,下面會需要,有一個比較重要的地方,輸入first and last name,這裡我理解的有點不夠透徹,這裡最好輸入你的主機名,確保公用名(CN)與服務器的完全限定域名(FQDN)精確相匹配。client拿CN與DNS域名進行比較以確保它確實連接到所需的服務器,而不是惡意的服務器。

31節點執行,輸入31主機名,如圖:

Kafka實戰:集群SSL加密認證和配置(最新版kafka-2.4.0)

32節點執行,輸入32主機名,如圖:

Kafka實戰:集群SSL加密認證和配置(最新版kafka-2.4.0)


2.生成CA認證證書(為了保證整個證書的安全性,需要使用CA進行證書的簽名保證)


雖然第一步生成了證書,但是證書是無標記的,意味著攻擊者可以通過創建相同的證書假裝任何機器。認證機構(CA)負責簽發證書。認證機構就像發行護照的政府,政府會對每張護照蓋章,使得護照很難被偽造。其它,政府核實印章,以保證此護照是真實的。類似的,CA簽署證書,密碼保證簽署的證書在計算上很難被偽造。因此,只要CA是一個真正值得信賴的權威機構,客戶就可以很高的保證他們正在連接到真實的機器。

<code>openssl req -new -x509 -keyout ca-key -out ca-cert -days 36/<code>

上面這個命令,可隨機在任一broker節點執行,只需要執行一次,執行完成後生成了兩個文件cat-key、ca-cert,將這兩個文件分別拷貝到所有broker節點上,這樣所有的broker都有了這兩個文件。

3.通過CA證書創建一個客戶端端信任證書(每個broker節點執行)

<code>keytool -keystore client.truststore.jks -alias CAKafka240 -import -file ca-cert/<code>

4.通過CA證書創建一個服務端器端信任證書(每個broker節點執行)

<code>keytool -keystore server.truststore.jks -alias CAKafka240 -import -file ca-cert/<code>

下面就是為證書籤名

5.從密鑰庫導出證書服務器端證書cert-file(每個broker節點執行)

<code>keytool -keystore server.keystore.jks -alias kafka240 -certreq -file cert-file/<code>

6.用CA給服務器端證書進行簽名處理(每個broker節點執行)

<code>openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:123456/<code>

7.將CA證書導入到服務器端keystore(每個broker節點執行)

<code>keytool -keystore server.keystore.jks -alias CAKafka240 -import -file ca-cert/<code>

8.將已簽名的服務器證書導入到服務器keystore(每個broker節點執行)

<code>keytool -keystore server.keystore.jks -alias kafka240 -import -file cert-signed/<code>

經過以上步驟,集群的每個broker節點都會有以下文件:

Kafka實戰:集群SSL加密認證和配置(最新版kafka-2.4.0)

至此服務端證書生成完畢。下面需要給kafka集群配置SSL加密認證


三、Kafka集群配置

在每個broker節點上配置,config/server.properties文件,這裡只修改紅框中的配置,其他配置項看我上一篇文章配置即可,如圖:

Kafka實戰:集群SSL加密認證和配置(最新版kafka-2.4.0)

注:如果設置的內部broker的通訊協議PLAINTEXT,那麼監聽PLAINTEXT的時候就需要作相應的配置 listeners=PLAINTEXT://host.name:port,SSL://host.name:port。

如果配置SSL之前,存在Kafka數據,那麼建議重新換一個位置來存放數據;如果確保之前的數據已經沒什麼用了,也可以直接刪除,然後在各個broker節點執行以下命令啟動集群:

<code>/home/kafka/kafka_2.12-2.4.0/bin/kafka-server-start.sh /home/kafka/kafka_2.12-2.4.0/config/server.properties &/<code>

用liunx自帶的openssl命令來驗證,SSL配置是否正確:

<code>openssl s_client -debug -connect salver32.hadoop.ljs:9093 -tls1/<code>

返回如下結果,則證明配置成功:

Kafka實戰:集群SSL加密認證和配置(最新版kafka-2.4.0)


四、客戶端連接配置

1.配置了SSL認證的集群,通過Kafka命令連接時,需要配置ssl認證進行連接:

Producer消費者發送消息,先新建文件producer.properties(文件名自定義):

Kafka實戰:集群SSL加密認證和配置(最新版kafka-2.4.0)

<code>bootstrap.servers=10.124.164.31:9093,10.124.165.32:9093security.protocol=SSLssl.endpoint.identification.algorithm=ssl.truststore.location=/home/cuadmin/ljs/kafkaSSL/server.truststore.jksssl.truststore.password=123456ssl.keystore.password=123456ssl.keystore.location=/home/cuadmin/ljs/kafkaSSL/server.keystore.jks/<code> 

發送消息命令:

<code>kafka-console-consumer.sh --bootstrap-server 10.168.192.31:9093,10.168.192.32:9093 --from-beginning --topic topic1 --consumer.config consum.properties /<code>

Consumer消費者接受消息,先新建文件comsumer.properties(文件名自定義):

Kafka實戰:集群SSL加密認證和配置(最新版kafka-2.4.0)

<code>security.protocol=SSLssl.endpoint.identification.algorithm=group.id=group_topic1ssl.truststore.location=/home/cuadmin/ljs/kafkaSSL/server.truststore.jksssl.truststore.password=123456ssl.keystore.password=123456ssl.keystore.location=/home/cuadmin/ljs/kafkaSSL/server.keystore.jks/<code>

消費消息命令:

<code>kafka-console-producer.sh --broker-list 10.168.192.31:9093,10.168.192.32:9093  --topic topic1  --producer.config producer.properties/<code>

2.如果客戶端需要通過Java代碼連接kafka集群,需要先生成客戶端連接從證書,跟服務端SSL證書生成類似,依次執行以下5行命令,這裡我就不再一一細說了,比較簡單,命令如下:

<code>客戶端: 導出客戶端證書 生成client.keystore.jks文件(即:生成客戶端的keystore文件)keytool -keystore client.keystore.jks -alias kafka240 -validity 365 -genkey將證書文件導入到客戶端keystorekeytool -keystore client.keystore.jks -alias kafka240 -certreq -file client.cert-file 用CA給客戶端證書進行簽名處理openssl x509 -req -CA ca-cert -CAkey ca-key -in client.cert-file -out client.cert-signed -days 365 -CAcreateserial -passin pass:123456將CA證書導入到客戶端keystorekeytool -keystore client.keystore.jks -alias CAKafka240 -import -file ca-cert 將已簽名的證書導入到客戶端keystorekeytool -keystore client.keystore.jks -alias kafka240 -import -file client.cert-signed /<code>

執行完成後,應該會生成以下紅框中三個文件:

Kafka實戰:集群SSL加密認證和配置(最新版kafka-2.4.0)

拷貝兩個文件client.keystore.jks、client.truststore.jks到本地:

Kafka實戰:集群SSL加密認證和配置(最新版kafka-2.4.0)


Producer端代碼實例:

<code>package com.hadoop.ljs.kafka010;import org.apache.kafka.clients.CommonClientConfigs;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.apache.kafka.common.config.SslConfigs;import java.util.Properties;import java.util.Random;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-23 08:58 * @version: v1.0 * @description: com.hadoop.ljs.kafka010 */public class KafkaSslProducer {    public static final String topic="topic1";    public static final String bootstrap_server="10.168.192.31:9093,10.168.192.32:9093";    public static final String client_truststore="D:\\\\kafkaSSL\\\\client.truststore.jks";    public static final String client_keystore="D:\\\\kafkaSSL\\\\client.keystore.jks";    public static final String client_ssl_password="123456";        public static void main(String[] args){        Properties props = new Properties();        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server);        //configure the following three settings for SSL Encryption        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, client_truststore);        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  client_ssl_password);        // configure the following three settings for SSL Authentication        props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, client_keystore);        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, client_ssl_password);        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, client_ssl_password);        props.put(ProducerConfig.ACKS_CONFIG, "all");        props.put(ProducerConfig.RETRIES_CONFIG, 0);        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");        Producer<string> producer = new KafkaProducer<string>(props);        TestCallback callback = new TestCallback();        Random rnd = new Random();        for (long i = 0; i <= 2 ; i++) {            String  key="lujisenKey-" + i;            String value="lujisenMessage------------"+i;            System.out.println("Send Message: "+"Key:"+key+"Value:"+value);            ProducerRecord<string> data = new ProducerRecord<string>(                    topic, key, value);            producer.send(data, callback);        }        producer.close();    }    private static class TestCallback implements Callback {        @Override        public void onCompletion(RecordMetadata recordMetadata, Exception e) {            if (e != null) {                System.out.println("Error while producing message to topic :" + recordMetadata);                e.printStackTrace();            } else {                String message = String.format("sent message to topic:%s partition:%s  offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());                System.out.println(message);            }        }    }}/<string>/<string>/<string>/<string>/<code>

Consumer端代碼實例:

<code>package com.hadoop.ljs.kafka010;import org.apache.kafka.clients.CommonClientConfigs;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.config.SslConfigs;import java.util.Arrays;import java.util.Collection;import java.util.Collections;import java.util.Properties;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-23 08:58 * @version: v1.0 * @description: com.hadoop.ljs.kafka010 */public class KafkaSslConsumer {    public static final String topic="topic1";    public static final String bootstrap_server="10.168.192.31:9093,10.168.192.32:9093";    public static final String client_truststore="D:\\\\kafkaSSL\\\\client.truststore.jks";    public static final String client_keystore="D:\\\\kafkaSSL\\\\client.keystore.jks";    public static final String client_ssl_password="123456";    public static final String consumer_group="group2_topic1";    public static void main(String[] args) {        Properties props = new Properties();        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server);        //configure the following three settings for SSL Encryption        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, client_truststore);        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  client_ssl_password);        //configure the following three settings for SSL Authentication        props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, client_keystore);        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, client_ssl_password);        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, client_ssl_password);        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumer_group);        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");        KafkaConsumer<byte> consumer = new KafkaConsumer<>(props);        TestConsumerRebalanceListener rebalanceListener = new TestConsumerRebalanceListener();        consumer.subscribe(Collections.singletonList(topic), rebalanceListener);        while (true) {            ConsumerRecords<byte> records = consumer.poll(1000);            for (ConsumerRecord<byte> record : records) {                System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());            }            consumer.commitSync();        }    }    private static class  TestConsumerRebalanceListener implements ConsumerRebalanceListener {        @Override        public void onPartitionsRevoked(Collection<topicpartition> partitions) {            System.out.println("Called onPartitionsRevoked with partitions:" + partitions);        }        @Override        public void onPartitionsAssigned(Collection<topicpartition> partitions) {            System.out.println("Called onPartitionsAssigned with partitions:" + partitions);        }    }}/<topicpartition>/<topicpartition>/<byte>/<byte>/<byte>/<code>

至此整個Kafka集群的SSL加密認證配置完成,有些地方整理的比較粗,如果問題及時給我在公眾號留言,看到後我會及時回覆!!!


分享到:


相關文章: