前兩天為大家分享了一篇關於kafka和RocketMQ選型的內容,那麼今天就為大家分享,kafkaKafka海量數據解決方案之測試方案和監控集群應用詳解,今天的內容和前兩天的內容是關聯的,推薦一下,可以關注我的賬號看前面的內容哦,同時還有視頻教程,廢話不多說,開始為大家分享實戰筆記乾貨!
要做技術選型,數據處理選kafka還是RocketMQ?我徹底蒙了
測試方案
1、添加model
<code>public
class
UserDataSource
{public
static
void
main
(String args[]
) throws InterruptedException { Properties props =new
Properties(); props.put("bootstrap.servers"
,"192.168.90.131:9092"
); props.put("acks"
,"all"
); props.put("delivery.timeout.ms"
,30000
); props.put("request.timeout.ms"
,20000
); props.put("batch.size"
,16384
); props.put("linger.ms"
,1
); props.put("buffer.memory"
,33554432
); props.put("key.serializer"
,"org.apache.kafka.common.serialization.StringSerializer"
); props.put("value.serializer"
,"org.apache.kafka.common.serialization.ByteArraySerializer"
); Producerbyte[]> producer =new
KafkaProducer<>(props);while
(true
){ User car = next();byte
[] carBinary = ObjectBinaryUtil.toBinary(car); ProducerRecordbyte[]> record =new
ProducerRecordbyte[]>("user"
, car.getId(), carBinary); producer.send(record); Thread.sleep(200
); System.out
.println("published..."
); } }private
static
Usernext
(){ Random random =new
Random(); User u =new
User( random.nextInt(10
) +""
,true
,""
,1
);return
u; } }/<code>
2、生成數據
<code>Properties props =new
Properties(); props.put("bootstrap.servers"
,"192.168.90.131:9092"
); props.put("acks"
,"all"
); props.put("delivery.timeout.ms"
,30000
); props.put("request.timeout.ms"
,20000
); props.put("batch.size"
,16384
); props.put("linger.ms"
,1
); props.put("buffer.memory"
,33554432
); props.put("key.serializer"
,"org.apache.kafka.common.serialization.StringSerializer"
); props.put("value.serializer"
,"org.apache.kafka.common.serialization.ByteArraySerializer"
); Producerbyte[]> producer =new
KafkaProducer<>(props);while
(true
){ User car = next();byte
[] carBinary = ObjectBinaryUtil.toBinary(car); ProducerRecordbyte[]> record =new
ProducerRecordbyte[]>("user"
, car.getId(), carBinary); producer.send(record); Thread.sleep(200
); System.out
.println("published..."
); } }private
static
Usernext
(){ Random random =new
Random(); User u =new
User( random.nextInt(10
) +""
,true
,""
,1
);return
u; }/<code>
3、創建topic
<code>bin/kafka-topics.sh /<code>
4、添加CarConsume
<code>public
static
void
main
(String args[]
){ String topic ="user"
; List partitions =new
ArrayList<>();for
(int
i=0
; i<3
; i++){ TopicPartition p =new
TopicPartition(topic, i); partitions.add
(p); } String targetTable ="user"
; ExactOnceConsumer exactConsumer =new
ExactOnceConsumer(topic, partitions, targetTable); exactConsumer.seek(); exactConsumer.subscribe(); }/<code>
5、添加 kafka.user 表
<code>drop
table
user
;CREATE
TABLE
`user`
(`topic`
varchar
(20
)DEFAULT
NULL
,`pid`
int
(11
)DEFAULT
NULL
,`offset`
mediumtext,`id`
int
(11
)DEFAULT
NULL
,`gender`
tinyint
(1
)DEFAULT
NULL
,`name`
varchar
(20
)DEFAULT
NULL
,`age`
int
DEFAULT
NULL
)ENGINE
=InnoDB
DEFAULT
CHARSET
=utf8;/<code>
6、添加 UserConsume
#直接 拷貝CarConsume
<code>public
class
UserConsume
{public
static
void
main
(String args[]
){ String topic ="user"
; List partitions =new
ArrayList<>();for
(int
i=0
; i<3
; i++){ TopicPartition p =new
TopicPartition(topic, i); partitions.add
(p); } String targetTable ="user"
; ExactOnceConsumer exactConsumer =new
ExactOnceConsumer(topic, partitions, targetTable); exactConsumer.seek(); exactConsumer.subscribe(); } }/<code>
7、 完善seek
seek 中offset還是寫死的,應該從MySQL獲取最新的offset
<code>SQL:select
max
(offset
+0
)from
kafka.electrocarwhere
pid=1
;/<code>
<code>public
long offsetByPartition(TopicPartition p){ String sql = String.format("select max(offset+0) from %s where pid=%d"
,this
.targetTable, p.partition()); Statement stat =null
;try
{ stat = jdbcConn.createStatement(); ResultSet rs = stat.executeQuery(sql);if
(rs.next()){return
rs.getInt(1
); } }catch
(SQLException e) {if
(stat !=null
){try
{ stat.close(); }catch
(SQLException e1) { e1.printStackTrace(); } } }return
0
; }/<code>
8、測試offset邊界
<code>delete
from
kafka.electrocar; 執行carConsume 停止carConsumeselect
pid,offset
,count
(*) ctfrom
kafka.electrocargroup
by
pid,offset
having
ct>1
;/<code>
監控 集群/應用
1 、安裝 KafkaOffsetMonitor
特點:權限小、侵入性小,快速實現必要的功能
在GitHub中 搜KafkaOffsetMonitor
注意:KafkaOffsetMonitor中引入了一些外網的js\css 文件,導致你的web異常
<code>java
-Xms512M
-Xmx512M
-Xss1024K
-cp
KafkaOffsetMonitor-assembly-0
.2
.1
.jar
com
.quantifind
.kafka
.offsetapp
.OffsetGetterWeb
\--port
8088 \--zk
192.168
.90
.131
:2181
\--refresh
5.minutes
\--retain
1.da
/<code>
KafkaOffsetMonitor 不僅可以監控集群狀態,還可以幫我們監控消費進度
只要把進度寫到 ZK 的
/consumers/${group_id}/offsets/${Topic}/${partition_id}
2 、獲取最新消費進度
(1)、哪裡可以獲取消費進度呢?MySQL中不太好使用
<code>Demo03: consumer.commitAsync(); 要提交監督,說明consumer一定是有這個進度在內存 這段代碼獲取offsetthis
.subscriptions.allConsumed()private
subscriptions 無法使用,用反射獲取 Field f = KafkaConsumer.
class
.getDeclaredField
("subscriptions"
); f.setAccessible(true
); SubscriptionState subState = (SubscriptionState) f.get
(consumer); #執行allConsumed(); 遍歷for
(TopicPartition p : latestOffsets.keySet()){if
(latestOffsets.containsKey(p)){ long offset = latestOffsets.get
(p).offset(); System.out
.println(String.format("pid:%d,offset:%d"
, p.partition(), offset)); } }/<code>
封裝
<code>/添加字段private
SubscriptionState subState;private
void setSubState(){try
{ Field f = KafkaConsumer.
class
.getDeclaredField
("subscriptions"
); f.setAccessible(true
);this
.subState = (SubscriptionState) f.get
(this
.kafkaConsumer); }catch
(NoSuchFieldException e) { e.printStackTrace(); }catch
(IllegalAccessException e) { e.printStackTrace(); } } setSubState(); System.out
.println("Sub state inited..."
);/<code>
3、減小ZK的壓力
(1)、實時更新ZK好嗎? 不好,ZK的讀、寫都是事務
要添加一個線程每3min更新一次,添加
<code>public
class
ZkUptThread
extends
Thread
{ }/<code>
<code>public
Map imOffsets =new
ConcurrentHashMap<>();public
Map zkOffsets =new
HashMap<>();/<code>
4、更新 InMemoryOffset
<code>public
void
uptIMOffset
(SubscriptionState subs
){ Map latestOffsets = subs.allConsumed();for
(TopicPartition p : latestOffsets.keySet()){if
(latestOffsets.containsKey(p)){long
offset = latestOffsets.get
(p).offset();this
.imOffsets.put(p, offset); } } } /<code>
5 run方法邏輯
offset未更新時,就不需要更新ZK
<code>@Override
public
void
run
() {while
(true
){try
{for
(Map.Entry entry : imOffsets.entrySet()) {long
imOffset = entry.getValue();if
(zkOffsets.containsKey(entry.getKey())&& zkOffsets.get
(entry.getKey()) == imOffset){continue
; }else
{ uptZk(entry.getKey(), imOffset); zkOffsets.put(entry.getKey(), imOffset); } } Thread.sleep(1000
*10
); System.out
.println("ZkUpdThread loop once ..."
); }catch
(InterruptedException e) { e.printStackTrace(); } } }/<code>
6、更新ZooKeeper
<code>依賴:<
dependency
><
groupId
>org.apache.curatorgroupId
><
artifactId
>curator-recipesartifactId
><
version
>4.0.0version
>dependency
><
dependencyManagement
><
dependencies
><
dependency
><
groupId
>org.apache.zookeepergroupId
><
artifactId
>zookeeperartifactId
><
version
>3.4.13version
>dependency
>dependencies
>dependencyManagement
>/<code>
<code>private
CuratorFramework zkClient;public
ZkUptThread
()
{ RetryPolicy retry =new
RetryNTimes(10
,5000
); zkClient = CuratorFrameworkFactory.newClient("192.168.90.131:2181"
,retry); }/<code>
<code>private
void
uptZk
(TopicPartition partition,
long
offset){ String path = String.format("/consumers/%s/offsets/%s/%d"
,groupid, topic, partition.partition());try
{byte
[] offsetBytes = String.format("%d"
,offset).getBytes();if
(zkClient.checkExists().forPath(path) !=null
){ zkClient.setData().forPath(path,offsetBytes); System.out
.println("update offset Znode..."
); }else
{ zkClient.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(path,offsetBytes); System.out
.println("add offset Znode..."
); } }catch
(Exception e) { e.printStackTrace(); } }/<code>
7、在ExactOnceConsumer 中創建線程
<code>private
ZkUptThread zkUptThread;private
void
setZkUptThread
(){ zkUptThread =new
ZkUptThread(topic,groupid); zkUptThread.start(); } setZkUptThread(); System.out
.println("uptZK Thread started..."
);this
.zkUptThread.uptIMOffset(subState);/<code>
線上經驗分享:流量激增、多網卡方案
點擊擴展鏈接看本筆記的教程哦!