架構師技能樹之——kafka


架構師技能樹之——kafka


什麼是Kafka

<code>Apache Kafka® is a distributed streaming platformA streaming platform has three key capabilities:Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.Store streams of records in a fault-tolerant durable way.Process streams of records as they occur.Kafka is generally used for two broad classes of applications:Building real-time streaming data pipelines that reliably get data between systems or applicationsBuilding real-time streaming applications that transform or react to the streams of dataTo understand how Kafka does these things, let's dive in and explore Kafka's capabilities from the bottom up.First a few concepts:Kafka is run as a cluster on one or more servers that can span multiple datacenters.The Kafka cluster stores streams of records in categories called topics.Each record consists of a key, a value, and a timestamp./<code>

Kafka是一款分佈式消息發佈和訂閱系統,它的特點是高性能、高吞吐量。

最早設計的目的是作為LinkedIn的活動流和運營數據的處理管道。這些數據主要是用來對用戶做用戶畫

像分析以及服務器性能數據的一些監控所以kafka一開始設計的目標就是作為一個分佈式、高吞吐量的

消息系統,所以適合運用在大數據傳輸場景。如的開源分佈式處理系統如cloudera 、Storm 、Spark、

Flink 等都支持與 Kafka 集成

Kafka 起初是由 Linkedin 公司採用 Scala 語言開發的 個多分區、多副本且基於 ZooKeeper 協調的分佈式消息系統

目前 Kafka 已經定位為一個分佈式流式處理平臺,它以高吞吐、可持久化、可水平擴展、支持流數據處理等多種特性而被廣泛使用。

  • 消息系統: kafka 和傳統的消息系統(也稱作消息中間件〉都具備系統解稿、冗餘存儲、流量削峰、緩衝、異步通信、擴展性、 可恢復性等功能。與此同時, Kafka供了大多數消息系統難以實現的消息順序性保障及回溯消費的功能
  • 存儲系統: Kafka 把消息持久化到磁盤,相比於其他基於內存存儲的系統而言,有效地降低了數據丟失的風險 也正是得益於 Kafka 的消息持久化功能和多副本機制,我們可以把 Kafka 作為長期的數據存儲系統來使用,只需要把對應的數據保留策略設置為“永久”或啟用主題的日誌壓縮功能即可
  • 流式處理平臺: Kafka 不僅為每個流行的流式處理框架提供了可靠的數據來源,還供了一個完整的流式處理類庫,比如窗口、連接、變換和聚合等各類操

Kafka的企業應用場景

由於kafka具有更好的吞吐量、內置分區、冗餘及容錯性的優點(kafka每秒可以處理幾十萬消息),讓

kafka成為了一個很好的大規模消息處理應用的解決方案。所以在企業級應用長,主要會應用於如下幾

個方面

行為跟蹤:kafka可以用於跟蹤用戶瀏覽頁面、搜索及其他行為。通過發佈-訂閱模式實時記錄到對應的

topic中,通過後端大數據平臺接入處理分析,並做更進一步的實時處理和監控


日誌收集:日誌收集方面,有很多比較優秀的產品,比如Apache Flume,很多公司使用kafka代理日誌

聚合。日誌聚合表示從服務器上收集日誌文件,然後放到一個集中的平臺(文件服務器)進行處理。在

實際應用開發中,我們應用程序的log都會輸出到本地的磁盤上,排查問題的話通過linux命令來搞定,

如果應用程序組成了負載均衡集群,並且集群的機器有幾十臺以上,那麼想通過日誌快速定位到問題,

就是很麻煩的事情了。所以一般都會做一個日誌統一收集平臺管理log日誌用來快速查詢重要應用的問

題。所以很多公司的套路都是把應用日誌集中到kafka上,然後分別導入到es和hdfs上,用來做實時檢

索分析和離線統計數據備份等。而另一方面,kafka本身又提供了很好的api來集成日誌並且做日誌收集

在這裡插入圖片描述

架構師技能樹之——kafka

kafka架構

架構師技能樹之——kafka

一個典型的kafka集群包含若干Producer(可以是應用節點產生的消息,也可以是通過Flume收集日誌產生的事件),若干個Broker(kafka支持水平擴展)、若干個Consumer Group,以及一個zookeeper集群。kafka通過zookeeper管理集群配置及服務協同。Producer使用push模式將消息發佈到broker,consumer通過監聽使用pull模式從broker訂閱並消費消息。

多個broker協同工作,producer和consumer部署在各個業務邏輯中。三者通過zookeeper管理協調請求和轉發。這樣就組成了一個高性能的分佈式消息發佈和訂閱系統。

圖上有一個細節是和其他mq中間件不同的點,producer 發送消息到broker的過程是push,而consumer從broker消費消息的過程是pull,主動去拉數據。而不是broker把數據主動發送給consumer

名詞解釋

  • 1)Broker 服務代理節點

Kafka集群包含一個或多個服務器,這種服務器被稱為broker。broker端不維護數據的消費狀態,提升

了性能。直接使用磁盤進行存儲,線性讀寫,速度快:避免了數據在JVM內存和系統內存之間的複製,

減少耗性能的創建對象和垃圾回收。

  • 2)Producer

負責發佈消息到Kafka broker

  • 3)Consumer

消息消費者,向Kafka broker讀取消息的客戶端,consumer從broker拉取(pull)數據並進行處理。

  • 4)Topic

每條發佈到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存

儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消

費數據而不必關心數據存於何處)

  • 5)Partition

Parition是物理上的概念,每個Topic包含一個或多個Partition.

  • 6)Consumer Group

每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定

group name則屬於默認的group)

  • 7)Topic & Partition

Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把

這條消息放進哪個queue裡。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個

Partition,每個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文

件。若創建topic1和topic2兩個topic,且分別有13個和19個分區,則整個集群上會相應會生成共32個

文件夾(本文所用集群共8個節點,此處topic1和topic2 replication-factor均為1)。


Kafka安裝部署

安裝kakfa之前需要安裝jdk和zookeeper

版本:

jdk8

zookeeper 3.4.12

kafka 2.11-2.0.0.tgz

1 jdk安裝

kafka是scale語言編寫,JVM系語言,所以需要安裝jdk

<code># 下載jdk-8u181-linux-x64.tar.gz# 解壓到/usr/javatar -zxvf jdk-8u181-linux-x64.tar.gz# 環境變量 vim /etc/profileexport JAVA_HOME=/usr/java/jdk1.8.0_181export JRE_HOME=$JAVA_HOME/jreexport PATH=$PATH:$JAVA_HOME/binexport CLASSPATH=./://$JAVA_HOME/bin:$JRE_HOME/lib# 配置生效source /etc/profile# 驗證java -version/<code>

2 zookeper安裝

Zoo Keeper 是安裝 Kafka 集群的必要組件, afka 通過 ZooKeeper 來實施對元數據信息,包括集群 broker 、主題、 分區等內容。

ZooKeeper 是一個開源的分佈式協調服務,是 google Chubby 個開源實現。分佈式應用程序可基ZooKeeper 實現諸如數據發佈/訂閱 、負載均衡、 命名服務、分佈式協調/通知,集群管理、 Master 舉、配置維護等功能。在 ZooKeeper 中共有3個角色: leader follower和observer,同一時刻 ZooKeeper 集群中只會有一個 leader ,其他的都 follower和observer。observer不參與投票,默認情況下 ZooKeeper 中只有 leader和follewer兩個角色

<code># 下載 zookeeper-3.4.12.tar.gz 到 /opt# 解壓tar -zxvf zookeeper-3.4.12.tar.gz# 進入zookeepr目錄cd zookeeper-3.4.12# 配置環境變量 /etc/profileexport ZOOKEEPER_HOME=/opt/zookeeper-3.4.12export PATH=$PATH:$ZOOKEEPER_HOME/bin# 使其生效source /etc/profile# 配置文件 進入$ZOOKEEPER_HOME/confcd confcp zoo_sample.cfg zoo.cfg# 配置zoo.cfg#####################zoo.cfg###################### Zookeeper服務器心跳時間,mstickTime=2000# 投票選舉新leader的初始化時間initLimit=10# leader於follower心跳檢測最大容忍時間,響應超過syncLimit*ticeTime,leader認為# follower “死掉",從服務器列表中刪除followersyncLimit=5# 數據目錄dataDir=/tmp/zookeeper/data# 日誌目錄dataLogDir=/tmp/zookeeper/log# Zookeeper對外服務端口clientPort=2181################################################## 創建數據目錄和日誌目錄mkdir -p /tmp/zookeeper/datamkdir -p /tmp/zookeeper/log# 在${dataDir}目錄(/tmp/zookeeper/data)下創建一個myid文件,並寫入# 一個數值,比如0。myid文件裡存放的是服務器的編號# 啟動服務zkServer.sh start# 查看服務狀態 zkServer.sh status# zk集群 生成環境都是使用集群# /etc/hosts 192.168.0.111 node1192.168.0.112 node2192.168.0.113 node3# 在3臺機器的zoo.cfg中添加以下配置server.0=192.168.0.111:2888:3888server.1=192.168.0.112:2888:3888server.2=192.168.0.112:2888:3888/<code>

3 kafka安裝

<code># 下載 kafka_2.11-2.0.0.tgz# 解壓到/opttar -zxvf kafka_2.11-2.0.0.tgzcd kafka_2.11-2.0.0# Kafka的根目錄$KAFKA_HOME,即/opt/kafka_2.11-2.0.0 # 環境配置 /etc/profileexport KAFKA_HOME=/opt/kafka_2.11-2.0.0export PATH=$PATH:$KAFKA_HOME/bin# 使其生效source /etc/profile# 修改配置文件 $KAFKA_HOME/conf/server.properties###################conf/server.properties################# broker的編號,如果有多個broker,則每個broker的編號需要設置不同broker.id=0# broker對外提供的服務入口地址listeners=PLAINTEXT://localhost:9092# 存放消息日誌文件的地址log.dirs=/tmp/kafka-logs# kafka所需要的Zooper集群地址,單機為了實驗kafka和zk安裝本機上zookeeper.connect=localhost:2181/kafka########################################################## 啟動kafka,在$KAFKA_HOME目錄bin/kafka-server-start.sh config/server.properties# 後臺啟動bin/kafka-server-start.sh -daemon config/server.properties # 或bin/kafka-server-start.sh config/server.properties & # 停止kafkabin/kafka-server-stop.sh -daemon config/server.properties ########################集群##########################192.168.0.111192.168.0.112192.168.0.113# 三臺環境都如上面下操作kafka# 分別修改三臺機器的server.properties,在同一個集群中每臺機器id必須唯一broker.id=0broker.id=1broker.id=2# 修改zooper.connect 驗證zk安裝在111機器上zookeeper.connect=192.168.0.111:2181# 修改listeners配置# 如果配置了listeners,那麼消息生產者和消費者都會使用listeners的配置來進行消息的收發,否則# 會使用localhost# PLAINTEXT表示協議,默認明文,可以選擇其他加密協議listeners=PLAINTEXT://192.168.0.111:9092# 分別啟動三臺服務器sh kafka-server-start.sh -daemon ../config/server.properties/<code> 

kafka的基本操作

創建topic

<code>sh kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 -- partitions 1 --topic test/<code>

Replication-factor 表示該topic需要在不同的broker中保存幾份,這裡設置成1,表示在兩個broker中保存兩份

Partitions 分區數

查看topic

<code>sh kafka-topics.sh --describe --zookeeper localhost:2181/kafka --topic first_topic/<code>

消費消息

<code>sh kafka-console-consumer.sh --bootstrap-server 192.168.13.106:9092 --topic test --from-beginning/<code>

發送消息

<code>sh kafka-console-producer.sh --broker-list 192.168.244.128:9092 --topic first_topic/<code>

————————————————

原文鏈接:https://blog.csdn.net/liulong1010/article/details/103842244


分享到:


相關文章: