實戰筆記:Kafka是如何實現十幾萬的海量數據的高併發寫入的?

前兩天為大家分享了一篇關於kafka和RocketMQ選型的內容,那麼今天就為大家分享,kafkaKafka海量數據解決方案之測試方案和監控集群應用詳解,今天的內容和前兩天的內容是關聯的,推薦一下,可以關注我的賬號看前面的內容哦,同時還有視頻教程,廢話不多說,開始為大家分享實戰筆記乾貨!

要做技術選型,數據處理選kafka還是RocketMQ?我徹底蒙了

測試方案

1、添加model

<code>

public

class

UserDataSource

{

public

static

void

main

(

String args[]

) throws InterruptedException

{ Properties props =

new

Properties(); props.put(

"bootstrap.servers"

,

"192.168.90.131:9092"

); props.put(

"acks"

,

"all"

); props.put(

"delivery.timeout.ms"

,

30000

); props.put(

"request.timeout.ms"

,

20000

); props.put(

"batch.size"

,

16384

); props.put(

"linger.ms"

,

1

); props.put(

"buffer.memory"

,

33554432

); props.put(

"key.serializer"

,

"org.apache.kafka.common.serialization.StringSerializer"

); props.put(

"value.serializer"

,

"org.apache.kafka.common.serialization.ByteArraySerializer"

); Producerbyte[]> producer =

new

KafkaProducer<>(props);

while

(

true

){ User car = next();

byte

[] carBinary = ObjectBinaryUtil.toBinary(car); ProducerRecordbyte[]> record =

new

ProducerRecordbyte[]>(

"user"

, car.getId(), carBinary); producer.send(record); Thread.sleep(

200

); System.

out

.println(

"published..."

); } }

private

static

User

next

(

)

{ Random random =

new

Random(); User u =

new

User( random.nextInt(

10

) +

""

,

true

,

""

,

1

);

return

u; } }/<code>

2、生成數據

<code>Properties props = 

new

Properties(); props.put(

"bootstrap.servers"

,

"192.168.90.131:9092"

); props.put(

"acks"

,

"all"

); props.put(

"delivery.timeout.ms"

,

30000

); props.put(

"request.timeout.ms"

,

20000

); props.put(

"batch.size"

,

16384

); props.put(

"linger.ms"

,

1

); props.put(

"buffer.memory"

,

33554432

); props.put(

"key.serializer"

,

"org.apache.kafka.common.serialization.StringSerializer"

); props.put(

"value.serializer"

,

"org.apache.kafka.common.serialization.ByteArraySerializer"

); Producerbyte[]> producer =

new

KafkaProducer<>(props);

while

(

true

){ User car = next();

byte

[] carBinary = ObjectBinaryUtil.toBinary(car); ProducerRecordbyte[]> record =

new

ProducerRecordbyte[]>(

"user"

, car.getId(), carBinary); producer.send(record); Thread.sleep(

200

); System.

out

.println(

"published..."

); } }

private

static

User

next

(

)

{ Random random =

new

Random(); User u =

new

User( random.nextInt(

10

) +

""

,

true

,

""

,

1

);

return

u; }/<code>

3、創建topic

<code>bin/kafka-topics.sh  
   
   
   
  /<code>

4、添加CarConsume

<code>

public

static

void

main

(

String args[]

)

{ String topic =

"user"

; List partitions =

new

ArrayList<>();

for

(

int

i=

0

; i<

3

; i++){ TopicPartition p =

new

TopicPartition(topic, i); partitions.

add

(p); } String targetTable =

"user"

; ExactOnceConsumer exactConsumer =

new

ExactOnceConsumer(topic, partitions, targetTable); exactConsumer.seek(); exactConsumer.subscribe(); }/<code>

5、添加 kafka.user 表

<code>

drop

table

user

;

CREATE

TABLE

`user`

(

`topic`

varchar

(

20

)

DEFAULT

NULL

,

`pid`

int

(

11

)

DEFAULT

NULL

,

`offset`

mediumtext,

`id`

int

(

11

)

DEFAULT

NULL

,

`gender`

tinyint

(

1

)

DEFAULT

NULL

,

`name`

varchar

(

20

)

DEFAULT

NULL

,

`age`

int

DEFAULT

NULL

)

ENGINE

=

InnoDB

DEFAULT

CHARSET

=utf8;/<code>

6、添加 UserConsume

#直接 拷貝CarConsume

<code>

public

class

UserConsume

{

public

static

void

main

(

String args[]

)

{ String topic =

"user"

; List partitions =

new

ArrayList<>();

for

(

int

i=

0

; i<

3

; i++){ TopicPartition p =

new

TopicPartition(topic, i); partitions.

add

(p); } String targetTable =

"user"

; ExactOnceConsumer exactConsumer =

new

ExactOnceConsumer(topic, partitions, targetTable); exactConsumer.seek(); exactConsumer.subscribe(); } }/<code>

7、 完善seek

seek 中offset還是寫死的,應該從MySQL獲取最新的offset

<code>SQL:  

select

max

(

offset

+

0

)

from

kafka.electrocar

where

pid=

1

;/<code>
<code> 

public

long offsetByPartition(TopicPartition p){ String sql = String.format(

"select max(offset+0) from %s where pid=%d"

,

this

.targetTable, p.partition()); Statement stat =

null

;

try

{ stat = jdbcConn.createStatement(); ResultSet rs = stat.executeQuery(sql);

if

(rs.next()){

return

rs.getInt(

1

); } }

catch

(SQLException e) {

if

(stat !=

null

){

try

{ stat.close(); }

catch

(SQLException e1) { e1.printStackTrace(); } } }

return

0

; }/<code>

8、測試offset邊界

<code>
 

delete

from

kafka.electrocar; 執行carConsume 停止carConsume

select

pid,

offset

,

count

(*) ct

from

kafka.electrocar

group

by

pid,

offset

having

ct>

1

;/<code>

監控 集群/應用

1 、安裝 KafkaOffsetMonitor

特點:權限小、侵入性小,快速實現必要的功能

在GitHub中 搜KafkaOffsetMonitor

注意:KafkaOffsetMonitor中引入了一些外網的js\css 文件,導致你的web異常

<code>

java

-Xms512M

-Xmx512M

-Xss1024K

-cp

KafkaOffsetMonitor-assembly-0

.2

.1

.jar

com

.quantifind

.kafka

.offsetapp

.OffsetGetterWeb

\

--port

8088 \

--zk

192

.168

.90

.131

:2181

\

--refresh

5

.minutes

\

--retain

1

.da

/<code>

KafkaOffsetMonitor 不僅可以監控集群狀態,還可以幫我們監控消費進度

只要把進度寫到 ZK 的

/consumers/${group_id}/offsets/${Topic}/${partition_id}

2 、獲取最新消費進度

(1)、哪裡可以獲取消費進度呢?MySQL中不太好使用

<code>Demo03:

consumer.commitAsync();   
要提交監督,說明consumer一定是有這個進度在內存

這段代碼獲取offset

this

.subscriptions.allConsumed()

private

subscriptions 無法使用,用反射獲取 Field f = KafkaConsumer

.

class

.

getDeclaredField

(

"subscriptions"

); f.setAccessible(

true

); SubscriptionState subState = (SubscriptionState) f.

get

(consumer); #執行allConsumed(); 遍歷

for

(TopicPartition p : latestOffsets.keySet()){

if

(latestOffsets.containsKey(p)){ long offset = latestOffsets.

get

(p).offset(); System.

out

.println(String.format(

"pid:%d,offset:%d"

, p.partition(), offset)); } }/<code>

封裝

<code>/添加字段

private

SubscriptionState subState;

private

void setSubState(){

try

{ Field f = KafkaConsumer

.

class

.

getDeclaredField

(

"subscriptions"

); f.setAccessible(

true

);

this

.subState = (SubscriptionState) f.

get

(

this

.kafkaConsumer); }

catch

(NoSuchFieldException e) { e.printStackTrace(); }

catch

(IllegalAccessException e) { e.printStackTrace(); } } setSubState(); System.

out

.println(

"Sub state inited..."

);/<code>

3、減小ZK的壓力

(1)、實時更新ZK好嗎? 不好,ZK的讀、寫都是事務

要添加一個線程每3min更新一次,添加

<code>

public

class

ZkUptThread

extends

Thread

{ }/<code>


實戰筆記:Kafka是如何實現十幾萬的海量數據的高併發寫入的?

<code>
 
    

public

Map imOffsets =

new

ConcurrentHashMap<>();

public

Map zkOffsets =

new

HashMap<>();/<code>

4、更新 InMemoryOffset

<code> 

public

void

uptIMOffset

(

SubscriptionState subs

)

{ Map latestOffsets = subs.allConsumed();

for

(TopicPartition p : latestOffsets.keySet()){

if

(latestOffsets.containsKey(p)){

long

offset = latestOffsets.

get

(p).offset();

this

.imOffsets.put(p, offset); } } } /<code>

5 run方法邏輯

offset未更新時,就不需要更新ZK

<code>@

Override

public

void

run

(

)

{

while

(

true

){

try

{

for

(Map.Entry entry : imOffsets.entrySet()) {

long

imOffset = entry.getValue();

if

(zkOffsets.containsKey(entry.getKey())&& zkOffsets.

get

(entry.getKey()) == imOffset){

continue

; }

else

{ uptZk(entry.getKey(), imOffset); zkOffsets.put(entry.getKey(), imOffset); } } Thread.sleep(

1000

*

10

); System.

out

.println(

"ZkUpdThread loop once ..."

); }

catch

(InterruptedException e) { e.printStackTrace(); } } }/<code>

6、更新ZooKeeper

<code>依賴:

<

dependency

>

<

groupId

>

org.apache.curator

groupId

>

<

artifactId

>

curator-recipes

artifactId

>

<

version

>

4.0.0

version

>

dependency

>

<

dependencyManagement

>

<

dependencies

>

<

dependency

>

<

groupId

>

org.apache.zookeeper

groupId

>

<

artifactId

>

zookeeper

artifactId

>

<

version

>

3.4.13

version

>

dependency

>

dependencies

>

dependencyManagement

>

/<code>
<code> 

private

CuratorFramework zkClient;

public

ZkUptThread

()

{ RetryPolicy retry =

new

RetryNTimes(

10

,

5000

); zkClient = CuratorFrameworkFactory.newClient(

"192.168.90.131:2181"

,retry); }/<code>
<code>

private

void

uptZk

(

TopicPartition partition,

long

offset

)

{ String path = String.format(

"/consumers/%s/offsets/%s/%d"

,groupid, topic, partition.partition());

try

{

byte

[] offsetBytes = String.format(

"%d"

,offset).getBytes();

if

(zkClient.checkExists().forPath(path) !=

null

){ zkClient.setData().forPath(path,offsetBytes); System.

out

.println(

"update offset Znode..."

); }

else

{ zkClient.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(path,offsetBytes); System.

out

.println(

"add offset Znode..."

); } }

catch

(Exception e) { e.printStackTrace(); } }/<code>

7、在ExactOnceConsumer 中創建線程

<code>
 

private

ZkUptThread zkUptThread;

private

void

setZkUptThread

(

)

{ zkUptThread =

new

ZkUptThread(topic,groupid); zkUptThread.start(); } setZkUptThread(); System.

out

.println(

"uptZK Thread started..."

);

this

.zkUptThread.uptIMOffset(subState);/<code>

線上經驗分享:流量激增、多網卡方案


實戰筆記:Kafka是如何實現十幾萬的海量數據的高併發寫入的?


點擊擴展鏈接看本筆記的教程哦!


分享到:


相關文章: