Apache Kafka Stream 字符統計

簡介#

Apache kafka 是一個分佈式流處理平臺,一個流處理平臺需要支持如下三個關鍵特性

  • 可以發佈數據到數據流中也可以訂閱數據流中的數據,類似於消息隊列
  • 將數據流中的數據持久化用作容錯處理
  • 當數據流中出現新的數據時,可以及時處理數據

Kafka可以部署在一臺或多臺服務器上以集群形式運行,將流中的數據以主題的形式分別保存,每一條記錄都包含key,value,和時間戳。

Kafka 提供了Producer,Consumer,Stream,Connector API,Producer負責生產數據,consumer負責消費數據 Stream API可以提供流處理能力,connector api 提供了數據和其他中間件之間的數據遷移能力


安裝Kafka#

下載 kafka, 然後解壓到本地目錄即可

<code>> tar -xzf kafka_2.12-2.2.0.tgz
> cd kafka_2.12-2.2.0/<code>

啟動Kafka#

Kafka依賴zookeeper,所以啟動kafka之前需要啟動zookeeper。Kafka 提供了zookeeper-server-start.sh啟動一個單節點zookeeper實例。

<code>kafka-server-start.sh $KAFKA_HOME/config/server.properties
[2019-07-06 00:31:59,766] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)

[2019-07-06 00:32:00,836] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2019-07-06 00:32:00,836] INFO starting (kafka.server.KafkaServer)
[2019-07-06 00:32:00,838] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2019-07-06 00:32:00,881] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
[2019-07-06 00:32:00,896] INFO Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:00,897] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:00,897] INFO Client environment:java.version=1.8.0_181 (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:00,897] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:00,897] INFO Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre (org.apache.zookeeper.ZooKeeper)
...
[2019-07-06 00:32:01,006] INFO Client environment:java.compiler= (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:01,006] INFO Client environment:os.name=Mac OS X (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:01,006] INFO Client environment:os.arch=x86_64 (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:01,006] INFO Client environment:os.version=10.14.6 (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:01,006] INFO Client environment:user.name=jet (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:01,006] INFO Client environment:user.home=/Users/jet (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:01,006] INFO Client environment:user.dir=/Users/jet (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:01,015] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@15c43bd9 (org.apache.zookeeper.ZooKeeper)
[2019-07-06 00:32:01,100] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2019-07-06 00:32:06,119] INFO Opening socket connection to server localhost/fe80:0:0:0:0:0:0:1%1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2019-07-06 00:32:06,224] INFO Socket connection established to localhost/fe80:0:0:0:0:0:0:1%1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2019-07-06 00:32:06,284] INFO Session establishment complete on server localhost/fe80:0:0:0:0:0:0:1%1:2181, sessionid = 0x100292d2f990000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2019-07-06 00:32:06,315] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2019-07-06 00:32:07,075] INFO Cluster ID = 4mgiWFzSTfKoRznZ6a-zsw (kafka.server.KafkaServer)
[2019-07-06 00:32:07,298] INFO KafkaConfig values:
\tadvertised.host.name = null
\tadvertised.listeners = null
\t...
\tunclean.leader.election.enable = false
\tzookeeper.connect = localhost:2181
\tzookeeper.connection.timeout.ms = 6000
\tzookeeper.max.in.flight.requests = 10
\tzookeeper.session.timeout.ms = 6000
\tzookeeper.set.acl = false
\tzookeeper.sync.time.ms = 2000
(kafka.server.KafkaConfig)
[2019-07-06 00:32:07,396] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-07-06 00:32:07,399] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-07-06 00:32:07,408] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-07-06 00:32:07,550] INFO Loading logs. (kafka.log.LogManager)
[2019-07-06 00:32:07,847] INFO [Log partition=test-0, dir=/tmp/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2019-07-06 00:32:07,874] INFO [Log partition=test-0, dir=/tmp/kafka-logs] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 162 ms (kafka.log.Log)
[2019-07-06 00:32:07,906] INFO Logs loading complete in 355 ms. (kafka.log.LogManager)
[2019-07-06 00:32:08,003] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2019-07-06 00:32:08,007] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2019-07-06 00:32:09,228] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2019-07-06 00:32:09,760] INFO [SocketServer brokerId=0] Created data-plane acceptor and processors for endpoint : EndPoint(null,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.network.SocketServer)
[2019-07-06 00:32:09,767] INFO [SocketServer brokerId=0] Started 1 acceptor threads for data-plane (kafka.network.SocketServer)
[2019-07-06 00:32:10,124] INFO [ExpirationReaper-0-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)

[2019-07-06 00:32:10,131] INFO [ExpirationReaper-0-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-07-06 00:32:10,201] INFO [ExpirationReaper-0-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-07-06 00:32:10,247] INFO [ExpirationReaper-0-ElectPreferredLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-07-06 00:32:10,553] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2019-07-06 00:32:11,293] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.zk.KafkaZkClient)
[2019-07-06 00:32:11,389] INFO Stat of the created znode at /brokers/ids/0 is: 55,55,1562344331342,1562344331342,1,0,0,72102868086751232,188,0,55
(kafka.zk.KafkaZkClient)
[2019-07-06 00:32:11,397] INFO Registered broker 0 at path /brokers/ids/0 with addresses: ArrayBuffer(EndPoint(localhost,9092,ListenerName(PLAINTEXT),PLAINTEXT)), czxid (broker epoch): 55 (kafka.zk.KafkaZkClient)
[2019-07-06 00:32:11,560] INFO [ExpirationReaper-0-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-07-06 00:32:11,568] INFO [ExpirationReaper-0-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-07-06 00:32:11,569] INFO [ExpirationReaper-0-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-07-06 00:32:11,607] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2019-07-06 00:32:11,609] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2019-07-06 00:32:11,618] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 9 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-07-06 00:32:11,639] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:1000,blockEndProducerId:1999) by writing to Zk with path version 2 (kafka.coordinator.transaction.ProducerIdManager)
[2019-07-06 00:32:11,696] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-07-06 00:32:11,700] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2019-07-06 00:32:11,700] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-07-06 00:32:14,848] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2019-07-06 00:32:14,959] INFO [SocketServer brokerId=0] Started data-plane processors for 1 acceptors (kafka.network.SocketServer)
[2019-07-06 00:32:15,022] INFO Kafka version: 2.3.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-07-06 00:32:15,022] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-07-06 00:32:15,022] INFO Kafka startTimeMs: 1562344334967 (org.apache.kafka.common.utils.AppInfoParser)
[2019-07-06 00:32:15,035] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2019-07-06 00:32:15,535] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(test-0) (kafka.server.ReplicaFetcherManager)
[2019-07-06 00:32:15,584] INFO Replica loaded for partition test-0 with initial high watermark 0 (kafka.cluster.Replica)
[2019-07-06 00:32:15,593] INFO [Partition test-0 broker=0] test-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
/<code>

創建主題#

通過partition指定分區數目,replication-factor指定每個分區有多少個備份

<code>kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test/<code>

查看當前節點上的主題列表

<code>kafka-topics.sh --list --bootstrap-server localhost:9092
test/<code>

Producer 發送消息#

通過kafka-console-producer可以將標準輸入中的信息發佈到對應的topic

<code>kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message/<code>

Consumer 消費消息#

通過kafka-console-consumer.sh 可以消費對應隊列中的消息

<code>kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message/<code>

集群部署#

以上示例只啟動了一個kafka節點,接下來加入另外兩個節點以集群方式運行

<code>cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-1.properties
cp $KAFKA_HOME/config/server.properties $KAFKA_HOME/config/server-2.properties

config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2/<code>

其中broker.id指定了服務器的ID,在集群中必須唯一,如果名字相同的兩個節點,則第二個節點將無法啟動

啟動server1,server2

<code>kafka-server-start.sh $KAFKA_HOME/config/server-1.properties &
kafka-server-start.sh $KAFKA_HOME/config/server-2.properties &/<code>

查看topics的相關信息

<code>kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic new-topic
Topic:new-topic\tPartitionCount:1\tReplicationFactor:3\tConfigs:segment.bytes=1073741824

\tTopic: new-topic\tPartition: 0\tLeader: 2\tReplicas: 2,1,0\tIsr: 2,1,0/<code>
  • leader: 指明現在broker.id為2的為首領節點
  • replicas: 是執行備份的節點的集合,無論它是否是首領節點或者活躍狀態
  • isr: 處於同步狀態節點集合

當我們終止首領節點時,它會重新選舉出新的首領節點

<code>ps aux | grep server-2.properties/<code>

此時再去查看kafka集群狀態,發現首領節點已變為broke1, relicas中仍然包含被終止的broker2,但是isr中已經不在包含

<code>kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic new-topic
Topic:new-topic\tPartitionCount:1\tReplicationFactor:3\tConfigs:segment.bytes=1073741824
\tTopic: new-topic\tPartition: 0\tLeader: 1\tReplicas: 2,1,0\tIsr: 1,0/<code>

Kafka Connect 導入導出#

Kafka connect是kafka提供的一個工具,用來導入導出數據。

  • 將文件導入到kafka topic中
<code># 創建文件
echo -e "import test\\n import data" > test.txt

# 下面的命令是將文本文件`test.txt`中的消息導入到主題`connect-test`中,讓後將主題connect-test中的消息導出到文件`test.sink.txt`中

connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-source.properties $KAFKA_HOME/config/connect-file-sink.properties/<code>

connect-standalone.properties 配置了kafka相關信息,指定了鍵值的序列化格式

<code>bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000/<code>

connect-file-source.properties 輸入文件配置信息,將目標文件中的信息逐行處理寫入到指定topic

<code>name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test/<code>

connect-file-sink.properties 輸出文件配置信息,將指定topic中的信息寫入到文件

<code>name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test/<code>

上面的過程也可以拆分成獨立的導入導出

<code>## 導入
connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-source.properties

## 導出
connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-file-sink.properties/<code>

如下圖,導入導出過程中可以通過consumer實時查看主題中的消息


Apache Kafka Stream 字符統計

如何刪除啟動topic#

topic的數據保存在kafka配置文件指定的目錄中,topic的信息保存在zookeeper中,所以刪除topic 需要同時刪除kafka的數據文件,同時刪除zookeeper節點

  • 停止kafka服務
<code>kafka-server-stop.sh/<code>
  • 刪除kafka
<code>rm -rf $KAFKA_HOME/data//<code>
  • 刪除zookeeper節點中保存的相關信息
<code>zkCli.sh -server localhost:2181 \t# 連接zookeeper

ls /\t\t\t\t\t\t\t\t# 查看zookeeper目錄
[admin, brokers, cluster, config, consumers, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]

# 刪除相關目錄
[zk: localhost:2181(CONNECTED) 1] deleteall /admin
[zk: localhost:2181(CONNECTED) 2] deleteall /brokers
[zk: localhost:2181(CONNECTED) 3] deleteall /cluster
[zk: localhost:2181(CONNECTED) 4] deleteall /consumers
[zk: localhost:2181(CONNECTED) 5] deleteall /config
[zk: localhost:2181(CONNECTED) 6] deleteall /isr_change_notification
[zk: localhost:2181(CONNECTED) 7] deleteall /latest_producer_id_block
[zk: localhost:2181(CONNECTED) 8] deleteall /log_dir_event_notification
[zk: localhost:2181(CONNECTED) 9] deleteall /zookeeper/<code>


分享到:


相關文章: