Kafka生產者原理-消息整理

對於一條消息而言,首先需要在生產者生產出來,然後發送給Broker存儲,最終由消費者拉取處理。對於Kafka的生產者端的代碼,有新舊兩個版本。新客戶端採用Java編寫,源碼在 client 目錄下面;老版本由Scala 編寫(目前已標記為"廢棄"),源碼放在 core 目錄下面。

發送消息Demo

對於生產者的消息生產,官方給出的 Demo 裡面有同步和異步兩種方式。

public class Producer extends Thread {
 private final KafkaProducer producer;
 private final String topic;
 private final Boolean isAsync;
 public Producer(String topic, Boolean isAsync) {
 Properties props = new Properties();
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
 props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 producer = new KafkaProducer<>(props);
 this.topic = topic;
 this.isAsync = isAsync;
 }
 public void run() {
 int messageNo = 1;
 while (true) {
 String messageStr = "Message_" + messageNo;
 long startTime = System.currentTimeMillis();
 if (isAsync) { // 異步發送
 producer.send(new ProducerRecord<>(topic, messageNo, messageStr), 
 new DemoCallBack(startTime, messageNo, messageStr));
 } else { 同步發送
 try {
 producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get();
 System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
 } catch (InterruptedException | ExecutionException e) {
 e.printStackTrace();
 }
 }
 ++messageNo;
 }
 }
}
// 事件回調
class DemoCallBack implements Callback {
 private final long startTime;
 private final int key;
 private final String message;
 public DemoCallBack(long startTime, int key, String message) {
 this.startTime = startTime;
 this.key = key;
 this.message = message;
 }
 public void onCompletion(RecordMetadata metadata, Exception exception) {
 long elapsedTime = System.currentTimeMillis() - startTime;
 if (metadata != null) {
 System.out.println(
 "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() + "), " + "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
 } else {
 exception.printStackTrace();
 }
 }
}

上面的 Producer 是一個線程,上面的例子我們只需要創建一個 Producer 線程並啟動即可。對於同步異步的處理,調用 KafkaProducer.send() 的時候,返回的結果是一個 Future 對象。當調用 Future.get() 的時候,會同步等待結果返回;如果不調用,則就是異步提交。

KafkaProducer 執行流程

Kafka生產者原理-消息整理

Kafka 生產者的大概執行流程如上圖所示。當我們在業務系統中調用 KafkaProducer.send() 時,首先會進入執行 Producer 端的 攔截器鏈(在實際業務代碼中,使用到攔截器的場景比較少)。執行完成之後,然後進入 doSend() 的異步發送流程。首先會等待對應 topic 的元數據可用,也就是檢查 Cluster 中是否有對應的 topic ;然後分別對 key 和 value 序列化為 字節數組;之後再根據規則找到 topic 對應的分區,如果沒有顯示指定 topic 的分區,則使用默認的輪詢策略或者如果指定了 key ,則按照 key 做 hash 分區;然後檢查序列化的大小是否超過限制,如果滿足限制,則調用 RecordAccumulator.append() 方法將消息追加到 RecordAccumulator 的管理隊列裡面去;最後返回一個 Future 類型的結果給業務代碼。

public Future send(ProducerRecord record, Callback callback) {
	// 執行發送前的攔截器鏈
	ProducerRecord interceptedRecord = this.interceptors.onSend(record);
	return doSend(interceptedRecord, callback);
}
/**
 * 異步發送消息執行.
 */
private Future doSend(ProducerRecord record, Callback callback) {
	// 首先確保 topic 元數據信息可用, 也就是有分區可用
	ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
	long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
	Cluster cluster = clusterAndWaitTime.cluster;
	// 分別序列化 key value
	byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
	byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
	// 計算出分區
	int partition = partition(record, serializedKey, serializedValue, cluster);
	TopicPartition tp = new TopicPartition(record.topic(), partition);
	setReadOnly(record.headers());
	Header[] headers = record.headers().toArray();
	int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers);
	ensureValidRecordSize(serializedSize);
	long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
	
	// 封裝 callback, 以委派給 RecordAccumulator 處理後續流程
	Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
	RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);
	/** batch 隊列中的大小 > 1 || 新 batch 已經滿了 */
	if (result.batchIsFull || result.newBatchCreated) {
		this.sender.wakeup();
	}
	return result.future;
}

為消息指定分區

對於一個 Topic 的消息而言,需要有一個唯一確定的分區來接收消息。在創建 KafkaProducer 的時候,我們可以在構造器中指定分區算法。自定義分區算法,需要實現 Partitioner 接口,並重寫其中的 partition() 方法。如果我們沒有顯示的指定分區規則,那麼默認會用 DefaultPartitioner 的實現,其分區方法如下:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
 List partitions = cluster.partitionsForTopic(topic);
 int numPartitions = partitions.size();
 // 1、如果沒有指定 key, 則使用輪詢策略, 均勻分佈
 if (keyBytes == null) {
 int nextValue = nextValue(topic);
 List availablePartitions = cluster.availablePartitionsForTopic(topic);
 if (availablePartitions.size() > 0) {
 int part = Utils.toPositive(nextValue) % availablePartitions.size();
 return availablePartitions.get(part).partition();
 } else {
 // no partitions are available, give a non-available partition
 return Utils.toPositive(nextValue) % numPartitions;
 }
 } else {
 // 2、如果指定了 key, 則先對 key 做 Hash, 然後取模
 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
 }
}

從 Kafka 默認的分區規則看來,如果沒有為消息指定 key,則使用輪詢的策略;如果指定了 key,就按照 key 做 Hash,然後取模。

Kafka生產者原理-消息整理

對於 Kafka 的默認消息分區策略,細心的讀者可能注意到,在獲取 Topic 分區列表的時候,獲取的是可用分區列表。也就是說,如果此時為 Topic 為 tp1 的主題發送消息時(假設tp1 有 4 個分區,網絡抖動前被分配到了 p3),如果此時第 4 個分區(p3)因為網絡抖動不可用了,此時獲取的可用分區就是 3 個。而我們對於指定 key 的消息,此時就打到了不同的分區。如果對消息的順序性有要求的話,此時就可能造成消息的亂序。

Kafka生產者原理-消息整理

客戶端記錄收集器

從 KafkaProducer 的執行流程中我們可以看出,消息的存儲最終委派給給了 RecordAccumulator。RecordAccumulator 是以 TopicPartition 的維度去批量存儲待發送數據的。

Kafka生產者原理-消息整理

從上面可以看出,消息在 RecordAccumulator 中是以“批次”的形式管理的。當一個批次的消息滿了之後,就可以準備被髮送到 Broker 端了。

下面我們看一下,RecordAccumulator 的代碼執行流程。

Kafka生產者原理-消息整理

當我們調用完 RecordAccumulator.append() 方法的時候,首先會獲取 topic 對應的 batch 隊列,此時如果沒有則會創建一個 topic 對應的隊列 dq。然後對 dq 加鎖,調用 tryAppend() 嘗試追加消息,如果追加結果不為空則成功,如果為空則失敗。此時說明當前 topic 對應的 batch 隊列的最後一個 batch 已經滿了,需要重新創建一個了。上面的流程有一點需要注意的是,對於 dq 的加鎖並不是全局的,減小了加鎖的粒度。此時再次對 dq 加鎖之後,有個雙重檢測的過程,如果再次添加失敗,則需要創建一個新的 batch,append 完消息之後,追加到 dq 的尾部。之後會返回 RecordAppendResult 的結果給到 KafkaProducer,然後判斷 RecordAccumulator 管理的 batch 隊列是否符合發送條件,如符合則委派 Sender 發送消息批次。

/**
 * 添加消息到 accumulator, 並返回結果
 */
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException {
 /** 記錄併發調用的線程數 */
 appendsInProgress.incrementAndGet();
 ByteBuffer buffer = null;
 if (headers == null) headers = Record.EMPTY_HEADERS;
 try {
 // 獲取 tp 對應的 batch 隊列
 Deque dq = getOrCreateDeque(tp);
 synchronized (dq) {
 if (closed)
 throw new KafkaException("Producer closed while send in progress");
 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
 if (appendResult != null)
 return appendResult;
 }
 // 追加失敗(說明 queue 尾部的 batch 已滿), 則申請空間 創建新的 batch
 byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
 int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
 log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
 buffer = free.allocate(size, maxTimeToBlock);
 synchronized (dq) {
 if (closed)
 throw new KafkaException("Producer closed while send in progress");
 // 雙重檢測
 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
 if (appendResult != null) {
 return appendResult;
 }
 // 創建新 batch, 添加消息
 MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
 ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
 dq.addLast(batch);
 incomplete.add(batch);
 // Don't deallocate this buffer in the finally block as it's being used in the record batch
 buffer = null;
 return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
 }
 } finally {
 if (buffer != null)
 free.deallocate(buffer);
 appendsInProgress.decrementAndGet();
 }
}

消息發送時的整理

通過之前的描述,我們知道 RecordAccumulator 追加消息的維度是 TopicPartition。那麼我們此時要發送給對應的 Broker 的時候,應該要怎樣處理呢?是不是還是需要按照 TopicPartition 的維度呢?

當然按照 TopicPartition 的維度發送也沒有關係,但是更高效率的是我們可以按照 Broker 的維度去發送 batchs。因此在 KafkaProducer 發現批次滿足發送條件,委派 Sender 發送時,會重新整合 batchs 的維度,以減少網絡開銷。

Kafka生產者原理-消息整理

參考:《Kafka技術內幕》、《Apache Kafka源碼剖析》、Kafka源碼


分享到:


相關文章: