如果您觉得“大数据开发运维架构”对你有帮助,欢迎转发朋友圈
这里文章从公众号转过来 图片效果不太好 有兴趣请关注公众号,或者去公众号查看
一、概述:
Kafka集群的安装配置请参考我的上一篇文章:Kafka入门:集群安装部署(最新版kafka-2.4.0)。从Kafka0.9.0.0开始,为提高集群的安全性,Kafka社区增加了许多功能;Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三种认证机制。
目前支持以下安全措施:
1.clients 与 brokers 认证
2.brokers 与 zookeeper认证
3.数据传输加密 between brokers and clients, between brokers, or between brokers and tools using SSL
4.授权clients read/write
认证版本支持:
1.SASL/GSSAPI (Kerberos) - 从0.9.0.0开始支持
2.SASL/PLAIN - 从 0.10.0.0开始支持
3.SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 -从0.10.2.0开始支持
SSL相关知识:
1.JavaSSL认证
SSL(Secure Socket Layer安全套接层),及其继任者传输层安全(Transport ;ayer Security,TLS)是为网络通信提供安全及数据完整性的一种安全协议。TLS与SSL在传输层对网络连接进行加密。
2.Kerberos认证 + ACL鉴权
Kerberos是一种网络认证协议,其设计目标是通过密钥系统为客户机/服务器应用程序提供强大的认证服务。ACL则是在Kerberos的基础上进行的鉴权措施,一般Kerberos认证就够使用了。
二、SSL证书生成
Apache的Kafka允许client通过SSL连接。SSL默认情况下被禁止,但可以根据需要开启:
您可以使用Java的keytool工具来完成,Keytool 是一个Java 数据证书的管理工具 ,Keytool 将密钥(key)和证书(certificates)存在一个称为keystore的文件中 在keystore里,包含两种数据:
1)..密钥实体(Key entity)——密钥(secret key)又或者是私钥和配对公钥(采用非对称加密)
2).可信任的证书实体(trusted certificate entries)——只包含公钥
keytool相关指令说明:
名称说明
-alias
别名,可自定义,这里叫kafka240
-keystore
指定密钥库的名称(就像数据库一样的证书库,可以有很多个证书,cacerts这个文件是jre自带的, 也可以使用其它文件名字,如果没有这个文件名字,它会创建这样一个)
-storepass
指定密钥库的密码-keypass指定别名条目的密码-list显示密钥库中的证书信息
-export
将别名指定的证书导出到文件-file参数指定导出到文件的文件名
-import
将已签名数字证书导入密钥库-keypasswd修改密钥库中指定条目口令
-dname
指定证书拥有者信息。
其中,CN=名字与姓氏/域名,OU=组织单位名称,O=组织名称,L=城市或区域名称,ST=州或省份名称,C=单位的两字母国家代码
-keyalg
指定密钥的算法-validity指定创建的证书有效期多少天-keysize指定密钥长度
1.Kafka集群的每个broker节点生成SSL密钥和证书(每个broker节执行)
每个节点执行一次后,集群中的每一台机器都有一个公私密钥对、一个标识该机器的证书,注意这里是所有的broker节点都要执行这个命令。
<code>keytool -keystore server.keystore.jks -alias kafka240 -validity 365 -genkey/<code>
执行下面命令时,需要输入密码,自己记住就行,下面会需要,有一个比较重要的地方,输入first and last name,这里我理解的有点不够透彻,这里最好输入你的主机名,确保公用名(CN)与服务器的完全限定域名(FQDN)精确相匹配。client拿CN与DNS域名进行比较以确保它确实连接到所需的服务器,而不是恶意的服务器。
31节点执行,输入31主机名,如图:
32节点执行,输入32主机名,如图:
2.生成CA认证证书(为了保证整个证书的安全性,需要使用CA进行证书的签名保证)
虽然第一步生成了证书,但是证书是无标记的,意味着攻击者可以通过创建相同的证书假装任何机器。认证机构(CA)负责签发证书。认证机构就像发行护照的政府,政府会对每张护照盖章,使得护照很难被伪造。其它,政府核实印章,以保证此护照是真实的。类似的,CA签署证书,密码保证签署的证书在计算上很难被伪造。因此,只要CA是一个真正值得信赖的权威机构,客户就可以很高的保证他们正在连接到真实的机器。
<code>openssl req -new -x509 -keyout ca-key -out ca-cert -days 36/<code>
上面这个命令,可随机在任一broker节点执行,只需要执行一次,执行完成后生成了两个文件cat-key、ca-cert,将这两个文件分别拷贝到所有broker节点上,这样所有的broker都有了这两个文件。
3.通过CA证书创建一个客户端端信任证书(每个broker节点执行)
<code>keytool -keystore client.truststore.jks -alias CAKafka240 -import -file ca-cert/<code>
4.通过CA证书创建一个服务端器端信任证书(每个broker节点执行)
<code>keytool -keystore server.truststore.jks -alias CAKafka240 -import -file ca-cert/<code>
下面就是为证书签名
5.从密钥库导出证书服务器端证书cert-file(每个broker节点执行)
<code>keytool -keystore server.keystore.jks -alias kafka240 -certreq -file cert-file/<code>
6.用CA给服务器端证书进行签名处理(每个broker节点执行)
<code>openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:123456/<code>
7.将CA证书导入到服务器端keystore(每个broker节点执行)
<code>keytool -keystore server.keystore.jks -alias CAKafka240 -import -file ca-cert/<code>
8.将已签名的服务器证书导入到服务器keystore(每个broker节点执行)
<code>keytool -keystore server.keystore.jks -alias kafka240 -import -file cert-signed/<code>
经过以上步骤,集群的每个broker节点都会有以下文件:
至此服务端证书生成完毕。下面需要给kafka集群配置SSL加密认证
三、Kafka集群配置
在每个broker节点上配置,config/server.properties文件,这里只修改红框中的配置,其他配置项看我上一篇文章配置即可,如图:
注:如果设置的内部broker的通讯协议PLAINTEXT,那么监听PLAINTEXT的时候就需要作相应的配置 listeners=PLAINTEXT://host.name:port,SSL://host.name:port。
如果配置SSL之前,存在Kafka数据,那么建议重新换一个位置来存放数据;如果确保之前的数据已经没什么用了,也可以直接删除,然后在各个broker节点执行以下命令启动集群:
<code>/home/kafka/kafka_2.12-2.4.0/bin/kafka-server-start.sh /home/kafka/kafka_2.12-2.4.0/config/server.properties &/<code>
用liunx自带的openssl命令来验证,SSL配置是否正确:
<code>openssl s_client -debug -connect salver32.hadoop.ljs:9093 -tls1/<code>
返回如下结果,则证明配置成功:
四、客户端连接配置
1.配置了SSL认证的集群,通过Kafka命令连接时,需要配置ssl认证进行连接:
Producer消费者发送消息,先新建文件producer.properties(文件名自定义):
<code>bootstrap.servers=10.124.164.31:9093,10.124.165.32:9093security.protocol=SSLssl.endpoint.identification.algorithm=ssl.truststore.location=/home/cuadmin/ljs/kafkaSSL/server.truststore.jksssl.truststore.password=123456ssl.keystore.password=123456ssl.keystore.location=/home/cuadmin/ljs/kafkaSSL/server.keystore.jks/<code>
发送消息命令:
<code>kafka-console-consumer.sh --bootstrap-server 10.168.192.31:9093,10.168.192.32:9093 --from-beginning --topic topic1 --consumer.config consum.properties /<code>
Consumer消费者接受消息,先新建文件comsumer.properties(文件名自定义):
<code>security.protocol=SSLssl.endpoint.identification.algorithm=group.id=group_topic1ssl.truststore.location=/home/cuadmin/ljs/kafkaSSL/server.truststore.jksssl.truststore.password=123456ssl.keystore.password=123456ssl.keystore.location=/home/cuadmin/ljs/kafkaSSL/server.keystore.jks/<code>
消费消息命令:
<code>kafka-console-producer.sh --broker-list 10.168.192.31:9093,10.168.192.32:9093 --topic topic1 --producer.config producer.properties/<code>
2.如果客户端需要通过Java代码连接kafka集群,需要先生成客户端连接从证书,跟服务端SSL证书生成类似,依次执行以下5行命令,这里我就不再一一细说了,比较简单,命令如下:
<code>客户端: 导出客户端证书 生成client.keystore.jks文件(即:生成客户端的keystore文件)keytool -keystore client.keystore.jks -alias kafka240 -validity 365 -genkey将证书文件导入到客户端keystorekeytool -keystore client.keystore.jks -alias kafka240 -certreq -file client.cert-file 用CA给客户端证书进行签名处理openssl x509 -req -CA ca-cert -CAkey ca-key -in client.cert-file -out client.cert-signed -days 365 -CAcreateserial -passin pass:123456将CA证书导入到客户端keystorekeytool -keystore client.keystore.jks -alias CAKafka240 -import -file ca-cert 将已签名的证书导入到客户端keystorekeytool -keystore client.keystore.jks -alias kafka240 -import -file client.cert-signed /<code>
执行完成后,应该会生成以下红框中三个文件:
拷贝两个文件client.keystore.jks、client.truststore.jks到本地:
Producer端代码实例:
<code>package com.hadoop.ljs.kafka010;import org.apache.kafka.clients.CommonClientConfigs;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.apache.kafka.common.config.SslConfigs;import java.util.Properties;import java.util.Random;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-23 08:58 * @version: v1.0 * @description: com.hadoop.ljs.kafka010 */public class KafkaSslProducer { public static final String topic="topic1"; public static final String bootstrap_server="10.168.192.31:9093,10.168.192.32:9093"; public static final String client_truststore="D:\\\\kafkaSSL\\\\client.truststore.jks"; public static final String client_keystore="D:\\\\kafkaSSL\\\\client.keystore.jks"; public static final String client_ssl_password="123456"; public static void main(String[] args){ Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server); //configure the following three settings for SSL Encryption props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, client_truststore); props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, client_ssl_password); // configure the following three settings for SSL Authentication props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, client_keystore); props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, client_ssl_password); props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, client_ssl_password); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producer<string> producer = new KafkaProducer<string>(props); TestCallback callback = new TestCallback(); Random rnd = new Random(); for (long i = 0; i <= 2 ; i++) { String key="lujisenKey-" + i; String value="lujisenMessage------------"+i; System.out.println("Send Message: "+"Key:"+key+"Value:"+value); ProducerRecord<string> data = new ProducerRecord<string>( topic, key, value); producer.send(data, callback); } producer.close(); } private static class TestCallback implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { System.out.println("Error while producing message to topic :" + recordMetadata); e.printStackTrace(); } else { String message = String.format("sent message to topic:%s partition:%s offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); System.out.println(message); } } }}/<string>/<string>/<string>/<string>/<code>
Consumer端代码实例:
<code>package com.hadoop.ljs.kafka010;import org.apache.kafka.clients.CommonClientConfigs;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.config.SslConfigs;import java.util.Arrays;import java.util.Collection;import java.util.Collections;import java.util.Properties;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-23 08:58 * @version: v1.0 * @description: com.hadoop.ljs.kafka010 */public class KafkaSslConsumer { public static final String topic="topic1"; public static final String bootstrap_server="10.168.192.31:9093,10.168.192.32:9093"; public static final String client_truststore="D:\\\\kafkaSSL\\\\client.truststore.jks"; public static final String client_keystore="D:\\\\kafkaSSL\\\\client.keystore.jks"; public static final String client_ssl_password="123456"; public static final String consumer_group="group2_topic1"; public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server); //configure the following three settings for SSL Encryption props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, client_truststore); props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, client_ssl_password); //configure the following three settings for SSL Authentication props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, client_keystore); props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, client_ssl_password); props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, client_ssl_password); props.put(ConsumerConfig.GROUP_ID_CONFIG, consumer_group); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<byte> consumer = new KafkaConsumer<>(props); TestConsumerRebalanceListener rebalanceListener = new TestConsumerRebalanceListener(); consumer.subscribe(Collections.singletonList(topic), rebalanceListener); while (true) { ConsumerRecords<byte> records = consumer.poll(1000); for (ConsumerRecord<byte> record : records) { System.out.printf("Received Message topic =%s, partition =%s, offset = %d, key = %s, value = %s\\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitSync(); } } private static class TestConsumerRebalanceListener implements ConsumerRebalanceListener { @Override public void onPartitionsRevoked(Collection<topicpartition> partitions) { System.out.println("Called onPartitionsRevoked with partitions:" + partitions); } @Override public void onPartitionsAssigned(Collection<topicpartition> partitions) { System.out.println("Called onPartitionsAssigned with partitions:" + partitions); } }}/<topicpartition>/<topicpartition>/<byte>/<byte>/<byte>/<code>
至此整个Kafka集群的SSL加密认证配置完成,有些地方整理的比较粗,如果问题及时给我在公众号留言,看到后我会及时回复!!!
閱讀更多 JasonLu1986 的文章