源碼分析Kafka之Producer

要使用kafka首先要實例化一個KafkaProducer,需要有brokerIP、序列化器

必要Properties以及acks(0、1、n)、compression、retries、batch.size非必要Properties,通過這個簡單的接口可以控制Producer大部分行為,實例化後就可以調用send方法發送消息了。

核心實現是這個方法:

public Future send(ProducerRecord record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord interceptedRecord = this.interceptors.onSend(record);//①
return doSend(interceptedRecord, callback);//②
}

通過不同的模式可以實現發送即忘(忽略返回結果)、同步發送(獲取返回的future對象,回調函數置為null)、異步發送(設置回調函數)三種消息模式。

我們來看看消息類ProducerRecord有哪些屬性:

private final String topic;//主題 

private final Integer partition;//分區
private final Headers headers;//頭
private final K key;//鍵
private final V value;//值
private final Long timestamp;//時間戳

它有多個構造函數,可以適應不同的消息類型:比如有無分區、有無key等。

①中ProducerInterceptors(有0 ~ 無窮多個,形成一個攔截鏈)對ProducerRecord進行攔截處理(比如打上時間戳,進行審計與統計等操作)

public ProducerRecord onSend(ProducerRecord record) {
ProducerRecord interceptRecord = record;
for (ProducerInterceptor interceptor : this.interceptors) {
try {
interceptRecord = interceptor.onSend(interceptRecord);
} catch (Exception e) {
// 不拋出異常,繼續執行下一個攔截器
if (record != null)
log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
else
log.warn("Error executing interceptor onSend callback", e);
}
}
return interceptRecord;
}

如果用戶有定義就進行處理並返回處理後的ProducerRecord,否則直接返回本身。

然後②中doSend真正發送消息,並且是異步的(源碼太長只保留關鍵):

private Future doSend(ProducerRecord record, Callback callback) {
TopicPartition tp = null;
try {
// 序列化 key 和 value
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
}
// 計算分區獲得主題與分區
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
// 回調與事務處理省略。
Header[] headers = record.headers().toArray();
// 消息追加到RecordAccumulator中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
// 該批次滿了或者創建了新的批次就要喚醒IO線程發送該批次了,也就是sender的wakeup方法
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
} catch (Exception e) {
// 攔截異常並拋出
this.interceptors.onSendError(record, tp, e);
throw e;
}
}

下面是計算分區的方法:

private int partition(ProducerRecord record, 
byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
// 消息有分區就直接使用,否則就使用分區器計算
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey,
record.value(), serializedValue, cluster);
}

默認的分區器DefaultPartitioner實現方式是如果partition存在就直接使用,否則根據key計算partition,如果key也不存在就使用round robin算法分配partition。

/**
* The default partitioning strategy:
*

    *
  • If a partition is specified in the record, use it
    *
  • If no partition is specified but a key is present choose a partition based on a hash of the key
    *
  • If no partition or key is present choose a partition in a round-robin fashion
    */
    public class DefaultPartitioner implements Partitioner {
    private final ConcurrentMap topicCounterMap = new ConcurrentHashMap<>();

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {//key為空
    int nextValue = nextValue(topic);

    List availablePartitions = cluster.availablePartitionsForTopic(topic);//可用的分區
    if (availablePartitions.size() > 0) {//有分區,取模就行
    int part = Utils.toPositive(nextValue) % availablePartitions.size();
    return availablePartitions.get(part).partition();
    } else {// 無分區,
    return Utils.toPositive(nextValue) % numPartitions;
    }
    } else {// key 不為空,計算key的hash並取模獲得分區
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
    }
    private int nextValue(String topic) {
    AtomicInteger counter = topicCounterMap.get(topic);
    if (null == counter) {
    counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
    AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
    if (currentCounter != null) {
    counter = currentCounter;
    }
    }
    return counter.getAndIncrement();//返回並加一,在取模的配合下就是round robin
    }
    }

    以上就是發送消息的邏輯處理,接下來我們再看看消息發送的物理處理。

    Sender(是一個Runnable,被包含在一個IO線程ioThread中,該線程不斷從RecordAccumulator隊列中的讀取消息並通過Selector將數據發送給Broker)的wakeup方法,實際上是KafkaClient接口的wakeup方法,由NetworkClient類實現,採用了NIO,也就是java.nio.channels.Selector.wakeup()方法實現。

    Sender的run中主要邏輯是不停執行準備消息和等待消息:

    long pollTimeout = sendProducerData(now);//③ 

    client.poll(pollTimeout, now);//④

    ③完成消息設置並保存到信道中,然後監聽感興趣的key,由KafkaChannel實現。

    public void setSend(Send send) {
    if (this.send != null)
    throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
    this.send = send;
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
    }
    // transportLayer的一種實現中的相關方法
    public void addInterestOps(int ops) {
    key.interestOps(key.interestOps() | ops);
    }

    ④主要是Selector的poll,其select被wakeup喚醒:

    public void poll(long timeout) throws IOException {
    /* check ready keys */
    long startSelect = time.nanoseconds();
    int numReadyKeys = select(timeout);//wakeup使其停止阻塞
    long endSelect = time.nanoseconds();
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
    if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
    Set readyKeys = this.nioSelector.selectedKeys();
    // Poll from channels that have buffered data (but nothing more from the underlying socket)
    if (dataInBuffers) {
    keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
    Set toPoll = keysWithBufferedRead;
    keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
    pollSelectionKeys(toPoll, false, endSelect);
    }
    // Poll from channels where the underlying socket has more data
    pollSelectionKeys(readyKeys, false, endSelect);
    // Clear all selected keys so that they are included in the ready count for the next select
    readyKeys.clear();
    pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
    immediatelyConnectedKeys.clear();
    } else {
    madeReadProgressLastPoll = true; //no work is also "progress"
    }

    long endIo = time.nanoseconds();
    this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
    }

    其中pollSelectionKeys方法會調用如下方法完成消息發送:

    public Send write() throws IOException {
    Send result = null;
    if (send != null && send(send)) {
    result = send;
    send = null;
    }
    return result;
    }
    private boolean send(Send send) throws IOException {
    send.writeTo(transportLayer);
    if (send.completed())
    transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
    return send.completed();
    }

    Send是一次數據發包,一般由ByteBufferSend或者MultiRecordsSend實現,其writeTo調用transportLayer的write方法,一般由PlaintextTransportLayer或者SslTransportLayer實現,區分是否使用ssl

    public long writeTo(GatheringByteChannel channel) throws IOException {
    long written = channel.write(buffers);
    if (written < 0)
    throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
    remaining -= written;
    pending = TransportLayers.hasPendingWrites(channel);
    return written;
    }
    public int write(ByteBuffer src) throws IOException {
    return socketChannel.write(src);
    }

    到此就把Producer

    業務相關邏輯處理非業務相關的網絡 2方面的主要流程梳理清楚了。其他額外的功能是通過一些配置保證的。

    比如順序保證就是max.in.flight.requests.per.connection,InFlightRequests的doSend會進行判斷(由NetworkClient的canSendRequest調用),只要該參數設為1即可保證當前包未確認就不能發送下一個包從而實現有序性

    public boolean canSendMore(String node) {
    Deque queue = requests.get(node);
    return queue == null || queue.isEmpty() ||
    (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
    }

    再比如可靠性,通過設置acks,Sender中sendProduceRequest的clientRequest加入了回調函數:

     RequestCompletionHandler callback = new RequestCompletionHandler() {
    public void onComplete(ClientResponse response) {
    handleProduceResponse(response, recordsByPartition, time.milliseconds());//調用completeBatch
    }
    };

    /**
    * 完成或者重試投遞,這裡如果acks不對就會重試
    *
    * @param batch The record batch
    * @param response The produce response
    * @param correlationId The correlation id for the request
    * @param now The current POSIX timestamp in milliseconds
    */
    private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,

    long now, long throttleUntilTimeMs) {
    }

    public class ProduceResponse extends AbstractResponse {
    /**
    * Possible error code:
    * INVALID_REQUIRED_ACKS (21)
    */
    }

    kafka源碼一層一層包裝很多,錯綜複雜,如有錯誤請大家不吝賜教。

    關注我:私信回覆“666”獲取往期Java高級架構資料、源碼、筆記、視頻

    Dubbo、Redis、Netty、zookeeper、Spring cloud、分佈式、高併發等架構技術


分享到:


相關文章: