03.05 RocketMQ 源碼分析Producer(9)

消息生產者的代碼都在client模塊中,相對於RocketMQ來講,消息生產者就是客戶端,也是消息的提供者。


RocketMQ 源碼分析Producer(9)

2.3.1 方法和屬性

1)主要方法介紹


RocketMQ 源碼分析Producer(9)

<code>//創建主題
void createTopic(final String key, final String newTopic, final int queueNum)
throws MQClientException;/<code>
<code>//根據時間戳從隊列中查找消息偏移量
long searchOffset(final MessageQueue mq, final long timestamp)/<code>
<code>//查找消息隊列中最大的偏移量
long maxOffset(final MessageQueue mq) throws MQClientException;/<code>
<code>//查找消息隊列中最小的偏移量
long minOffset(final MessageQueue mq) /<code>
<code>//根據偏移量查找消息
MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException;/<code>
<code>//根據條件查找消息
QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
final long end) throws MQClientException, InterruptedException;/<code>
<code>//根據消息ID和主題查找消息
MessageExt viewMessage(String topic,String msgId)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException;/<code>


RocketMQ 源碼分析Producer(9)

<code>//啟動
void start() throws MQClientException;/<code>
<code>//關閉
void shutdown();/<code>
<code>//查找該主題下所有消息
List<messagequeue> fetchPublishMessageQueues(final String topic) throws MQClientException;/<messagequeue>/<code>
<code>//同步發送消息
SendResult send(final Message msg) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;/<code>
<code>//同步超時發送消息
SendResult send(final Message msg, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;/<code>
<code>//異步發送消息
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException;/<code>
<code>//異步超時發送消息
void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException;/<code>
<code>//發送單向消息
void sendOneway(final Message msg) throws MQClientException, RemotingException,
InterruptedException;/<code>
<code>//選擇指定隊列同步發送消息
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;/<code>
<code>//選擇指定隊列異步發送消息
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException;/<code>
<code>//選擇指定隊列單項發送消息
void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, InterruptedException;/<code>
<code>//批量發送消息
SendResult send(final Collection<message> msgs) throws MQClientException, RemotingException
, MQBrokerException,InterruptedException;/<message>/<code>

2)屬性介紹


RocketMQ 源碼分析Producer(9)

<code>producerGroup:生產者所屬組
createTopicKey:默認Topic
defaultTopicQueueNums:默認主題在每一個Broker隊列數量
sendMsgTimeout:發送消息默認超時時間,默認3s
compressMsgBodyOverHowmuch:消息體超過該值則啟用壓縮,默認4k

retryTimesWhenSendFailed:同步方式發送消息重試次數,默認為2,總共執行3次
retryTimesWhenSendAsyncFailed:異步方法發送消息重試次數,默認為2
retryAnotherBrokerWhenNotStoreOK:消息重試時選擇另外一個Broker時,
是否不等待存儲結果就返回,默認為false
maxMessageSize:允許發送的最大消息長度,默認為4M/<code>

2.3.2 啟動流程


RocketMQ 源碼分析Producer(9)

代碼:DefaultMQProducerImpl#start

<code>//檢查生產者組是否滿足要求
this.checkConfig();
//更改當前instanceName為進程ID
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
//獲得MQ客戶端實例
this.mQClientFactory = MQClientManager.getInstance()
.getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);/<code>

整個JVM中只存在一個MQClientManager實例,維護一個MQClientInstance緩存表

ConcurrentMap<string> factoryTable = new ConcurrentHashMap<string>();/<string>/<string>

同一個clientId只會創建一個MQClientInstance。

MQClientInstance封裝了RocketMQ網絡處理API,是消息生產者和消息消費者與NameServer、Broker打交道的網絡通道

代碼:MQClientManager#getAndCreateMQClientInstance

<code>public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, 
RPCHook rpcHook) {
//構建客戶端ID
String clientId = clientConfig.buildMQClientId();
//根據客戶端ID或者客戶端實例
MQClientInstance instance = this.factoryTable.get(clientId);
//實例如果為空就創建新的實例,並添加到實例表中
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {

instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}

return instance;
}/<code>

代碼:DefaultMQProducerImpl#start

<code>//註冊當前生產者到到MQClientInstance管理中,方便後續調用網路請求
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please."
+ FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL)null);
}
//啟動生產者
if (startFactory) {
mQClientFactory.start();
}/<code>

2.3.3 消息發送


RocketMQ 源碼分析Producer(9)

代碼:DefaultMQProducerImpl#send(Message msg)

<code>//發送消息
public SendResult send(Message msg) {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}/<code>

代碼:DefaultMQProducerImpl#send(Message msg,long timeout)

<code>//發送消息,默認超時時間為3s
public SendResult send(Message msg,long timeout){
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}/<code>

代碼:DefaultMQProducerImpl#sendDefaultImpl

<code>/校驗消息
Validators.checkMessage(msg, this.defaultMQProducer);/<code>

1)驗證消息

代碼:Validators#checkMessage

<code>public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
throws MQClientException {
//判斷是否為空
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// 校驗主題
Validators.checkTopic(msg.getTopic());
\t\t
// 校驗消息體
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");

}

if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}

if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
}/<code>

2)查找路由

代碼:DefaultMQProducerImpl#tryToFindTopicPublishInfo

<code>private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//從緩存中獲得主題的路由信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//路由信息為空,則從NameServer獲取路由
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
//如果未找到當前主題的路由信息,則用默認主題繼續查找
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}/<code>


RocketMQ 源碼分析Producer(9)

代碼:TopicPublishInfo

<code>public class TopicPublishInfo {
private boolean orderTopic = false;\t//是否是順序消息
private boolean haveTopicRouterInfo = false;
private List<messagequeue> messageQueueList = new ArrayList<messagequeue>();\t//該主題消息隊列
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();//每選擇一次消息隊列,該值+1
private TopicRouteData topicRouteData;//關聯Topic路由元信息
}/<messagequeue>/<messagequeue>/<code>

代碼:MQClientInstance#updateTopicRouteInfoFromNameServer

<code>TopicRouteData topicRouteData;
//使用默認主題從NameServer獲取路由信息
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
//使用指定主題從NameServer獲取路由信息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}/<code>

代碼:MQClientInstance#updateTopicRouteInfoFromNameServer

<code>//判斷路由是否需要更改
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {

changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}/<code>

代碼:MQClientInstance#updateTopicRouteInfoFromNameServer

<code>if (changed) {
//將topicRouteData轉換為發佈隊列
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
//遍歷生產
Iterator<entry>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<string> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
//生產者不為空時,更新publishInfo信息
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}/<string>/<entry>/<code>

代碼:MQClientInstance#topicRouteData2TopicPublishInfo

<code>public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
\t//創建TopicPublishInfo對象
TopicPublishInfo info = new TopicPublishInfo();
\t//關聯topicRoute
info.setTopicRouteData(route);
\t//順序消息,更新TopicPublishInfo
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
String[] brokers = route.getOrderTopicConf().split(";");
for (String broker : brokers) {
String[] item = broker.split(":");
int nums = Integer.parseInt(item[1]);
for (int i = 0; i < nums; i++) {
MessageQueue mq = new MessageQueue(topic, item[0], i);
info.getMessageQueueList().add(mq);
}
}

info.setOrderTopic(true);
} else {
//非順序消息更新TopicPublishInfo
List<queuedata> qds = route.getQueueDatas();
Collections.sort(qds);
//遍歷topic隊列信息
for (QueueData qd : qds) {
//是否是寫隊列
if (PermName.isWriteable(qd.getPerm())) {
BrokerData brokerData = null;
//遍歷寫隊列Broker
for (BrokerData bd : route.getBrokerDatas()) {
//根據名稱獲得讀隊列對應的Broker
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}

if (null == brokerData) {
continue;
}

if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue;
}
\t\t\t\t//封裝TopicPublishInfo寫隊列
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
}
}
}

info.setOrderTopic(false);
}
\t//返回TopicPublishInfo對象
return info;
}/<queuedata>/<code>


3)選擇隊列

  • 默認不啟用Broker故障延遲機制
  • 代碼:TopicPublishInfo#selectOneMessageQueue(lastBrokerName)

    <code>public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    //第一次選擇隊列
    if (lastBrokerName == null) {
    return selectOneMessageQueue();
    } else {
    //sendWhichQueue
    int index = this.sendWhichQueue.getAndIncrement();
    //遍歷消息隊列集合
    for (int i = 0; i < this.messageQueueList.size(); i++) {
    //sendWhichQueue自增後取模
    int pos = Math.abs(index++) % this.messageQueueList.size();
    if (pos < 0)
    pos = 0;
    //規避上次Broker隊列
    MessageQueue mq = this.messageQueueList.get(pos);
    if (!mq.getBrokerName().equals(lastBrokerName)) {
    return mq;
    }
    }
    //如果以上情況都不滿足,返回sendWhichQueue取模後的隊列
    return selectOneMessageQueue();
    }
    }/<code>

    代碼:TopicPublishInfo#selectOneMessageQueue()

    <code>//第一次選擇隊列
    public MessageQueue selectOneMessageQueue() {
    //sendWhichQueue自增
    int index = this.sendWhichQueue.getAndIncrement();
    //對隊列大小取模
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
    pos = 0;
    //返回對應的隊列

    return this.messageQueueList.get(pos);
    }/<code>

    啟用Broker故障延遲機制

    <code>public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    //Broker故障延遲機制
    if (this.sendLatencyFaultEnable) {
    try {
    //對sendWhichQueue自增
    int index = tpInfo.getSendWhichQueue().getAndIncrement();
    //對消息隊列輪詢獲取一個隊列
    for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
    if (pos < 0)
    pos = 0;
    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
    //驗證該隊列是否可用
    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
    //可用
    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
    return mq;
    }
    }
    \t\t\t//從規避的Broker中選擇一個可用的Broker
    final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
    //獲得Broker的寫隊列集合
    int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
    if (writeQueueNums > 0) {
    //獲得一個隊列,指定broker和隊列ID並返回
    final MessageQueue mq = tpInfo.selectOneMessageQueue();
    if (notBestBroker != null) {
    mq.setBrokerName(notBestBroker);
    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
    }
    return mq;
    } else {
    latencyFaultTolerance.remove(notBestBroker);
    }
    } catch (Exception e) {
    log.error("Error occurred when selecting message queue", e);
    }

    return tpInfo.selectOneMessageQueue();
    }

    return tpInfo.selectOneMessageQueue(lastBrokerName);
    }/<code>


    RocketMQ 源碼分析Producer(9)

    延遲機制接口規範

    <code>public interface LatencyFaultTolerance {
    //更新失敗條目
    void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);
    \t//判斷Broker是否可用
    boolean isAvailable(final T name);
    \t//移除Fault條目

    void remove(final T name);
    \t//嘗試從規避的Broker中選擇一個可用的Broker
    T pickOneAtLeast();
    }
    /<code>

    FaultItem:失敗條目

    <code>class FaultItem implements Comparable<faultitem> {
    //條目唯一鍵,這裡為brokerName
    private final String name;
    //本次消息發送延遲
    private volatile long currentLatency;
    //故障規避開始時間
    private volatile long startTimestamp;
    }/<faultitem>/<code>

    消息失敗策略

    <code>public class MQFaultStrategy {
    //根據currentLatency本地消息發送延遲,從latencyMax尾部向前找到第一個比currentLatency小的索引,如果沒有找到,返回0
    \tprivate long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    //根據這個索引從notAvailableDuration取出對應的時間,在該時長內,Broker設置為不可用
    \tprivate long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
    }/<code>

    原理分析

    代碼:DefaultMQProducerImpl#sendDefaultImpl

    <code>sendResult = this.sendKernelImpl(msg, 
    mq,
    communicationMode,

    sendCallback,
    topicPublishInfo,
    timeout - costTime);
    endTimestamp = System.currentTimeMillis();
    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);/<code>

    如果上述發送過程出現異常,則調用DefaultMQProducerImpl#updateFaultItem

    <code>public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    //參數一:broker名稱
    //參數二:本次消息發送延遲時間
    //參數三:是否隔離
    this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
    }/<code>

    代碼:MQFaultStrategy#updateFaultItem

    <code>public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
    //計算broker規避的時長
    long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
    //更新該FaultItem規避時長
    this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
    }/<code>

    代碼:MQFaultStrategy#computeNotAvailableDuration

    <code>private long computeNotAvailableDuration(final long currentLatency) {
    //遍歷latencyMax
    for (int i = latencyMax.length - 1; i >= 0; i--) {
    //找到第一個比currentLatency的latencyMax值
    if (currentLatency >= latencyMax[i])
    return this.notAvailableDuration[i];
    }
    //沒有找到則返回0
    return 0;
    }/<code>

    代碼:LatencyFaultToleranceImpl#updateFaultItem

    <code>

    4)發送消息

    消息發送API核心入口DefaultMQProducerImpl#sendKernelImpl

    <code>

    代碼:DefaultMQProducerImpl#sendKernelImpl

    <code>//獲得broker網絡地址信息
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
    //沒有找到從NameServer更新broker網絡地址信息
    tryToFindTopicPublishInfo(mq.getTopic());
    brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }/<code>
    <code>//為消息分類唯一ID
    if (!(msg instanceof MessageBatch)) {
    MessageClientIDSetter.setUniqID(msg);
    }

    boolean topicWithNamespace = false;
    if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
    topicWithNamespace = true;
    }
    //消息大小超過4K,啟用消息壓縮
    int sysFlag = 0;
    boolean msgBodyCompressed = false;
    if (this.tryToCompressMessage(msg)) {
    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
    msgBodyCompressed = true;
    }
    //如果是事務消息,設置消息標記MessageSysFlag.TRANSACTION_PREPARED_TYPE

    final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
    }/<code>
    <code>//如果註冊了消息發送鉤子函數,在執行消息發送前的增強邏輯
    if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    context.setNamespace(this.defaultMQProducer.getNamespace());
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (isTrans != null && isTrans.equals("true")) {
    context.setMsgType(MessageType.Trans_Msg_Half);
    }

    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
    context.setMsgType(MessageType.Delay_Msg);
    }
    this.executeSendMessageHookBefore(context);
    }/<code>

    代碼:SendMessageHook

    <code>public interface SendMessageHook {
    String hookName();

    void sendMessageBefore(final SendMessageContext context);

    void sendMessageAfter(final SendMessageContext context);
    }/<code>

    代碼:DefaultMQProducerImpl#sendKernelImpl

    <code>//構建消息發送請求包
    SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
    //生產者組

    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    //主題
    requestHeader.setTopic(msg.getTopic());
    //默認創建主題Key
    requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
    //該主題在單個Broker默認隊列樹
    requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
    //隊列ID
    requestHeader.setQueueId(mq.getQueueId());
    //消息系統標記
    requestHeader.setSysFlag(sysFlag);
    //消息發送時間
    requestHeader.setBornTimestamp(System.currentTimeMillis());
    //消息標記
    requestHeader.setFlag(msg.getFlag());
    //消息擴展信息
    requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
    //消息重試次數
    requestHeader.setReconsumeTimes(0);
    requestHeader.setUnitMode(this.isUnitMode());
    //是否是批量消息等
    requestHeader.setBatch(msg instanceof MessageBatch);
    if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
    if (reconsumeTimes != null) {
    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
    }

    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
    if (maxReconsumeTimes != null) {
    requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
    }
    }/<code>
    <code>case ASYNC:\t\t//異步發送
    Message tmpMessage = msg;
    boolean messageCloned = false;
    if (msgBodyCompressed) {
    //If msg body was compressed, msgbody should be reset using prevBody.
    //Clone new message using commpressed message body and recover origin massage.
    //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
    tmpMessage = MessageAccessor.cloneMessage(msg);

    messageCloned = true;
    msg.setBody(prevBody);
    }

    if (topicWithNamespace) {
    if (!messageCloned) {
    tmpMessage = MessageAccessor.cloneMessage(msg);
    messageCloned = true;
    }
    msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(),
    this.defaultMQProducer.getNamespace()));
    }

    \t\tlong costTimeAsync = System.currentTimeMillis() - beginStartTime;
    \t\tif (timeout < costTimeAsync) {
    \t\t throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
    \t\t}
    \t\tsendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
    \t\t\tbrokerAddr,
    \t\t\tmq.getBrokerName(),
    \t\t\ttmpMessage,
    \t\t\trequestHeader,
    \t\t\ttimeout - costTimeAsync,
    \t\t\tcommunicationMode,
    \t\t\tsendCallback,
    \t\t\ttopicPublishInfo,
    \t\t\tthis.mQClientFactory,
    \t\t\tthis.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
    \t\t\tcontext,
    \t\t\tthis);
    \tbreak;
    case ONEWAY:
    case SYNC:\t\t//同步發送
    long costTimeSync = System.currentTimeMillis() - beginStartTime;
    if (timeout < costTimeSync) {
    throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
    }
    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
    brokerAddr,
    mq.getBrokerName(),
    msg,
    requestHeader,
    timeout - costTimeSync,
    communicationMode,
    context,
    this);
    break;
    default:
    assert false;
    break;

    }/<code>
    <code>

    2.3.4 批量消息發送


    RocketMQ 源碼分析Producer(9)

    批量消息發送是將同一個主題的多條消息一起打包發送到消息服務端,減少網絡調用次數,提高網絡傳輸效率。當然,並不是在同一批次中發送的消息數量越多越好,其判斷依據是單條消息的長度,如果單條消息內容比較長,則打包多條消息發送會影響其他線程發送消息的響應時間,並且單批次消息總長度不能超過DefaultMQProducer#maxMessageSize。

    批量消息發送要解決的問題是如何將這些消息編碼以便服務端能夠正確解碼出每條消息的消息內容。

    代碼:DefaultMQProducer#send

    <code>public SendResult send(Collection<message> msgs) 
    throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //壓縮消息集合成一條消息,然後發送出去
    return this.defaultMQProducerImpl.send(batch(msgs));
    }/<message>/<code>

    代碼:DefaultMQProducer#batch

    <code>private MessageBatch batch(Collection<message> msgs) throws MQClientException {
    MessageBatch msgBatch;
    try {
    //將集合消息封裝到MessageBatch
    msgBatch = MessageBatch.generateFromList(msgs);
    //遍歷消息集合,檢查消息合法性,設置消息ID,設置Topic
    for (Message message : msgBatch) {
    Validators.checkMessage(message, this);
    MessageClientIDSetter.setUniqID(message);
    message.setTopic(withNamespace(message.getTopic()));
    }
    //壓縮消息,設置消息body
    msgBatch.setBody(msgBatch.encode());
    } catch (Exception e) {
    throw new MQClientException("Failed to initiate the MessageBatch", e);
    }
    //設置msgBatch的topic
    msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
    return msgBatch;
    }/<message>/<code>


    分享到:


    相關文章: