Kafka入門-Consumer API 使用

消費者 API 允許應用程序從Kafka集群中的主題讀取數據流

注:使用的是新版本的 Java 語言編寫的 API,不對 0.8.x 及以前版本進行討論

maven 依賴

<dependency>
<groupid>org.apache.kafka/<groupid>
<artifactid>kafka-clients/<artifactid>
<version>2.3.0/<version>
/<dependency>

簡單示例

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* 2019年10月25日 superz add
*/
public class ConsumerDemo
{
public static String brokerServers = "localhost:9092";
public static String topic = "test";
public static String groupId = "testgroup";
public static Long timeout = 1000L;
public static void main(String[] args) {
Properties props = new Properties(); // ①
props.put("bootstrap.servers", brokerServers);
props.put("group.id", groupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
// 構建消費者客戶端實例
Consumer<string> consumer = new KafkaConsumer<string>(props); // ②
// 訂閱主題
consumer.subscribe(Collections.singleton(topic)); // ③
// 循環消費消息
try {
while (true) {
ConsumerRecords<string> records = consumer.poll(Duration.ofMillis(timeout)); // ④

for (ConsumerRecord<string> record : records) {
System.out.println(record.value());
}
}
}finally {
consumer.close(); // ⑤
}
}
/<string>/<string>/<string>/<string>

示例分析

參數配置

類似生產者,在簡單示例 ① 也是進行參數配置,其中有 4 個必填參數:

  • bootstrap.servers:指定了 Kafka 集群的連接字符串。
  • key.deserializer:指定類把字節數組轉換成 Java 對象
  • value.deserializer:指定類把字節數組轉換成 Java 對象
  • group.id:指定了 KafkaConsumer 屬於哪一個消費者群組

KafkaConsumer 除了上面 4 個參數,還有非常多的參數,開發人員可以根據實際的業務需求來進行設置以解決最終的問題。

在實際使用情況下很難記住所有的參數名稱,只能有個大概的印象,可以直接使用客戶端中的 org.apache.kafka.clients.consumer.ConsumerConfig 類來獲取對應的參數,如下:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

創建消費者實例

簡單示例 ② 構建了消費者實例。

訂閱主題與分區

在創建好消費者之後,需要進行簡單示例 ③ 步驟,為消費者訂閱相關的主題,一個消費者可以訂閱一個或多個主題。

subscribe() 方法既可以以集合的方式訂閱多個主題,也可以以正則表達式的形式訂閱特定模式的主題,subscribe 有多個重載方法如下:

public void subscribe(Collection<string> topics)
public void subscribe(Collection<string> topics, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
/<string>/<string>

注意:如果前後兩次訂閱了不同的主題,那麼消費者以最後一次的為準,如下:

consumer.subscribe(Collections.singleton(topic1));
consumer.subscribe(Collections.singleton(topic2));

上面的示例中,最終消費者訂閱的是 topic2。

如果消費者採用的是正則表達式的方式訂閱,在使用一段時間後,如果又創建了新的主題,並且主題的名字與正則表達式相匹配,那麼這個消費者就可以消費到新添加的主題中的消息。

消費者不僅可以通過 subscribe() 方法訂閱主題,還可以直接訂閱某些主題的特定分區,在 KafkaConsumer 中還提供了一個 assign() 方法來實現這個功能,源碼定義如下:

public void assign(Collection<topicpartition> partitions)
/<topicpartition>

這個方法只有一個參數 partitions,用來指定需要訂閱的分區集合。但可能在開發過程中並不知道主題具體的分區信息,可以通過 KafkaConsumer 中的 partitionsFor() 方法來查詢指定主題的分區信息,partitionsFor() 方法的源碼定義如下:

public List<partitioninfo> partitionsFor(String topic)
public List<partitioninfo> partitionsFor(String topic, Duration timeout)
/<partitioninfo>/<partitioninfo>

消息消費

Kafka 中的消費是基於拉模式的,也就是消費者主動向服務端發起請求來拉取消息。

從簡單示例 ④ 可以看出,Kafka 中的消息消費是一個不斷輪詢的過程,消費者重複的執行 poll() 方法,poll() 方法返回的是所訂閱的主題(分區)上的一組消息。

對於 poll() 方法,如果某些分區中沒有可消費的消息,那麼此分區返回的消息結果為空;如果訂閱的所有分區中都沒有可消費的消息,那麼返回空的消息集合。

poll() 方法在源碼中的定義,如下:

public ConsumerRecords poll(final Duration timeout)
@Deprecated
public ConsumerRecords poll(final long timeoutMs)

在 poll() 方法中有一個超時時間參數 timeout/timeoutMs,用來控制 poll()方法的阻塞時間,在消費者的緩衝區裡沒有可用數據時會發生阻塞。

poll() 方法的返回類型是 ConsumerRecords,它用來表示一次拉取操作所獲得的消息集。

消費者消費到的每條消息的類型為 ConsumerRecord,這個和生產者中的消息對象 ProducerRecord 相對應。poll() 獲取的 ConsumerRecords 內部包含了若干個 ConsumerRecord,ConsumerRecords 提供了多個方法來獲取 ConsumerRecord:

  • iterator():循環遍歷消息集內部的消息
  • records(TopicPartition partition):獲取消息集中指定分區的消息
  • records(String topic):獲取消息集中指定主題的消息

關閉消費者實例

消費者實例在用完之後一定要顯式地執行關閉動作以釋放運行過程中佔用地各種系統資源,包括內存資源、Socket 連接,示例見簡單示例 ⑤

參考

Consumer API 的 javadoc:http://kafka.apachecn.org/10/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html


分享到:


相關文章: