關於kafka集羣,程式設計師的學習筆記,超詳細,看過都說好

kafka

kafka是個消息隊列系統,用於消息的發佈和訂閱。

核心概念

kafka是個集群,意味著有很多節點組成。

broker,每個節點稱作broker。

message,指的是發佈到kafka的每一條記錄。

topic,指的是message的分類。方便發佈者和訂閱者根據需要讀取不同的內容。比如可以分為訂單topic、日誌topic等。

partition,指的是topic內部按照message劃分為不同的類型,一個message只能屬於一個topic中的某一個partition。目的是為了讓同一個topic的數據分散在不同的broker上,實現負載均衡。比如訂單topic可以按照ip分為不同的partition,一個partition中的數據在一個broker中存儲。

replica,指的是同一個topic內部消息存放多少份,每一份存放在不同的broker中。目的是為了備份,保證數據安全。

leader,指的接收客戶讀寫請求的broker。

follower,指的是同步數據的broker。

producer,指的是發佈消息。

consumer,指的是訂閱消息。

offset(偏移量),偏移量實際上就是數據的索引,類似於數組中的下標0、1、2、3。

消費方式

消費者需要分組。

同一組內的消費者只能有一個消費者讀取同一條消息;目的是為了防止多線程讀取數據的時候,產生重複讀的問題。

不同組之間的消費者可以多次讀取同一條消息;目的是為了共享消息。

安裝

修改配置文件config/server.properties

修改以下參數

broker.id

host.name

log.dirs

zookeeper.connect

delete.topic.enable=true

把kafka複製到其他節點後,記得修改server.properties中的broker.id和host.name的值。

啟動命令

nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

命令行操作

創建主題

bin/kafka-topics.sh --create --zookeeper 192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181 --replication-factor 1 --partitions 1 --topic test

描述主題

bin/kafka-topics.sh --describe --zookeeper 192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181 --topic test

刪除主題

bin/kafka-topics.sh --delete --zookeeper 192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181 --topic test

生產者

bin/kafka-console-producer.sh --broker-list 192.168.1.100:9092,192.168.1.101:9092,192.168.1.02:9092 --topic test

消費者

bin/kafka-console-consumer.sh --zookeeper 192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181 --topic test --from-beginning

查看消費進度

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 192.168.1.100:2181 --topic wuchao -group group1

關於kafka集群,程序員的學習筆記,超詳細,看過都說好

java操作

依賴

 org.apache.kafka kafka_2.11 0.8.2.2 org.scala-lang scala-library 2.11.8 org.scala-lang scala-reflect 2.11.8

生產者

public static void main(String[] args) throws IOException { Properties originalProps = new Properties(); //該originalProps存儲著我們訪問kafka集群所需要的信息 originalProps.load(KafkaProducerTest.class.getResourceAsStream("producer.properties")); //如果發送String類型的消息,一定要是有StringEncoder originalProps.put("serializer.class", "kafka.serializer.StringEncoder");  //1.獲取可以操作的對象 Producer producer = new Producer(new ProducerConfig(originalProps));  //2.發送數據 for(int i=0; i<5; i++){ KeyedMessage message = new KeyedMessage(topic, i+"--"); producer.send(message); }    //3.關閉 producer.close();}

分區類

kafka默認使用的分區類,是隨機分區。

我們可以指定分區方式,好處是讓同一類輸入一個partition,方便客戶端消費的時候,可以從同一個partition取到這一類數據。

//自定義分區,需要實現一個接口類public class MyParitioner implements Partitioner{ //一定要使用有參的構造方法 public MyParitioner(VerifiableProperties p) { } /** * @param key表示在生產者生產數據的時候,需要指定的。 */ @Override public int partition(Object key, int numPartitions) { if(key==null) return 0; return Integer.parseInt(key.toString())%2; }}

在生產者的代碼中,需要修改2個地方:

一個是在配置中,指定自定義的分區類

originalProps.put("partitioner.class", MyParitioner.class.getName());

一個是發送消息的時候,指定key

KeyedMessage message = new KeyedMessage(topic, i+"", i+"----");producer.send(message);

消費者

//1.獲取消費者對象ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(originalProps ));//2.消費數據Map topicCountMap = new HashMap<>();//topicCountMap中key指的是topic名稱,value指的是多少個線程來消費該topictopicCountMap.put("wuchao", 1); //只有1個線程消費topic主題//messageStreams結構中,key指的是topicMap>> messageStreams = consumer.createMessageStreams(topicCountMap);List> list = messageStreams.get("wuchao");//因為上面設置了1個線程,所以list中只有一個元素。返回值KafkaStream就是指向kafka集群中wuchao這個主題的流KafkaStream 
kafkaStream = list.get(0);ConsumerIterator iterator = kafkaStream.iterator();while(iterator.hasNext()){ //這是個阻塞操作。如果流中沒有數據,就在這裡等待 MessageAndMetadata mmd = iterator.next(); //獲得主題 String topic = mmd.topic(); //獲得分區編號 int partition = mmd.partition(); //指的是該消息的偏移量 long offset = mmd.offset(); //讀取消息 String message = new String(mmd.message()); System.out.println("topic="+topic+"\tpartition="+partition+"\toffset="+offset+"\tmessage="+message); //立刻修改zk中關於偏移量的值 consumer.commitOffsets();}

消費者如何查看自己消費到了哪個位置

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 192.168.1.100:2181 --topic wuchao -group group1

消費者是否可以隨意指定自己消費的位置

直接在zk中使用set命令修改消費的offset。

關於kafka集群,程序員的學習筆記,超詳細,看過都說好

是否可以指定每次都從開始消費

在consumer.properties中增加配置信息auto.offset.reset=earliest


分享到:


相關文章: