kafka消息服務器使用場景 90% 緩衝 消息通訊組件
1、jd訂單系統
2、解決數據的併發寫
Topic :消息分類,以日誌分區存儲,每一個分區都會有leader和Follower
Record:組成Topic的基本單元,一則消息 key value ts
broker:運行Kafka服務-broker 一臺機器只運行一個Broker
Producer:生產消息
Consumer:消費消息
ConsumerGroup:把消費者歸類,同一Group的消費者默認是對Topic分區消息實現負載均衡(Fair Shard)
不同Group之間相互不影響,消息是以廣播的形式發佈。
Kafka消息服務搭建:
1.解壓到/usr目錄
[root@CentOS ~]# tar zxf kafka_2.11-0.11.0.0.tgz -C /usr/
2.創建軟連接(可選)
[root@CentOS ~]# ln -s /usr/kafka_2.11-0.11.0.0/ kafka
3.啟動zookeeper (對broker監控,記錄集群的元數據信息)
4.搭建kafka的偽分佈式
1.創建三個kafka的配置文件
[root@CentOS config]# cp server.properties server-1.properties
[root@CentOS config]# cp server.properties server-2.properties
[root@CentOS config]# cp server.properties server-3.properties
修改配置文件:
server-1.peroperties
broker.id=0 -- 一個kafka實例唯一標示必須唯一
delete.topic.enable=true --允許用戶刪除topic
listeners=PLAINTEXT://CentOS:9092 --因為kafka服務是通過TCP/IP實現
log.dirs=/tmp/kafka-logs-1 --配置kafka消息存儲路徑
zookeeper.connect=CentOS:2181
server-2.peroperties
broker.id=1 -- 一個kafka實例唯一標示必須唯一
delete.topic.enable=true --允許用戶刪除topic
listeners=PLAINTEXT://CentOS:9093 --因為kafka服務是通過TCP/IP實現
log.dirs=/tmp/kafka-logs-2 --配置kafka消息存儲路徑
zookeeper.connect=CentOS:2181
server-3.peroperties
broker.id=3 -- 一個kafka實例唯一標示必須唯一
delete.topic.enable=true --允許用戶刪除topic
listeners=PLAINTEXT://CentOS:9094 --因為kafka服務是通過TCP/IP實現
log.dirs=/tmp/kafka-logs-3 --配置kafka消息存儲路徑
zookeeper.connect=CentOS:2181
啟動Kafka
start.sh
echo 'start kafka cluster..'
for i in {1..3}
do
/usr/kafka_2.11-0.11.0.0/bin/kafka-server-start.sh /usr/kafka_2.11-0.11.0.0/config/server-$i.properties 1>/dev/null 2>&1 &
sleep 5
done
shutdown.sh
for i in `jps|grep Kafka | awk '{print $1}'`
do
echo 'kill kafka '$i
kill -9 $i
done
Topic管理
創建Topic
[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --create --topic topic01 --partitions 3 --replication-factor 2
查看所有Topic
[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --list
查看Tpoic詳情
[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --describe --topic topic01
Topic:topic01PartitionCount:3ReplicationFactor:2Configs:
Topic: topic01Partition: 0Leader: 2Replicas: 2,0Isr: 2,0
Topic: topic01Partition: 1Leader: 0Replicas: 0,1Isr: 0,1
Topic: topic01Partition: 2Leader: 1Replicas: 1,2Isr: 1,2
修改分區
[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --alter --topic topic01 --partitions 4
刪除
[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --zookeeper CentOS:2181 --delete --topic topic01
Topic topic01 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
訂閱消息:
[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01 --from-beginning
生產消息
[root@CentOS kafka_2.11-0.11.0.0]# ./bin/kafka-console-producer.sh --broker-list CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01
介紹:http://kafka.apache.org/documentation/
Java集成Kafka
<dependency>
<groupid>org.apache.kafka/<groupid>
<artifactid>kafka-clients/<artifactid>
<version>0.11.0.0/<version>
1.消費方offset管理
①:手動一定offset
kafkaConsumer.seek(new TopicPartition("t_user_topic",part),offset);
②kafkaConsumer.commitAsync();
提交指定分區的offset
①props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
②將所有的消息的消費放置在try{}catch(Exception e)
try{
//消費消息
//提交offset+1
}catch(Exception e){
//seek到上一次消費的消息
}
提交到指定分區
Map<topicpartition> offsetMetaMap=new HashMap<topicpartition>() ;/<topicpartition>/<topicpartition>
OffsetAndMetadata offsetMeta=new OffsetAndMetadata(offset+1);//必須必當前offset大1
offsetMetaMap.put(new TopicPartition("topic01",分區號),offsetMeta);
kafkaConsumer.commitAsync(offsetMetaMap,null);
2、如何使用Kafka發送複雜消息
Deserializer、Serializer接口
3.如何實現Partition數據的負載均衡
①生產方:實現 Partitioner
②消費方:
1.subscribe fair shared 均分策略
2.assign 指定指定分區
閱讀更多 JackYang1993 的文章