RocketMQ Consumer接收消息流程

 這節介紹Consumer接收消息的流程,分為Pull和Push模式。

1. 初始化

 上一節講Rebalance時提到,Consumer接受客戶端有兩種方式:

  1. Broker發現客戶端列表有變化,通知所有Consumer執行Rebalance
  2. Consumer定時每20秒自動執行Rebalance

其中1.的通知到達Consumer後,會立即觸發Rebalance,然後會重置2.的定時器等待時間。二者最後通知Consumer的方式為

  1. Push模式:當有新的Queue分配給客戶端時,會新包裝一個PullRequest,用於後續自動拉取消息,具體到DefaultMQPushConsumerImpl的executePullRequestImmediately方法
  2. Pull模式:回調DefaultMQPullConsumerImpl的MessageQueueListener有Queue發生改變

2. Push模式

 executePullRequestImmediately的內容為:

<code>\tpublic void executePullRequestImmediately(final PullRequest pullRequest) {
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}/<code>

即將PullRequest對象傳給了PullMessageService的executePullRequestImmediately方法:

<code>public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}/<code>

PullMessageService的結構如下:

RocketMQ Consumer接收消息流程

內部維護著一個LinkedBlockingQueue屬性pullRequestQueue,用於存儲待處理的PullRequest;還有一個ScheduledExecutorService,用於延期處理PullRequest。具體流程如下:

RocketMQ Consumer接收消息流程

  1. RebalanceImpl調用DefaultMQPushConsumerImpl的executePullRequestImmediately方法,傳入PullRequest
  2. DefaultMQPushConsumerImpl內部調用PullMessageService的executePullRequestImmediately方法,該方法會把傳入的PullRequest對象放到LinkedBlockingQueue中進行存儲,等待後續處理。
  3. PullMessageService會循環從隊列中出隊一個PullRequest,並調用自身的pullMessage用於後續處理。該動作會從MQClientInstance中選擇對應的客戶端實例DefaultMQPushConsumerImpl,並委託給它的pullMessage方法。
  4. DefaultMQPushConsumerImpl會先判斷當前請求是否滿足條件,如果不滿足條件,會調用PullMessage的executePullRequestLater方法,將當前請求延後處理。如果滿足條件,會封裝一個PullCallback對象用於處理異步消息,並調用PullAPIWrapper異步請求Broker拉取消息。

從上面的過程可以看出,Push模式內部還是客戶端主動去拉取的,即所謂的封裝拉模式以實現推模式,簡單示意圖如下:

RocketMQ Consumer接收消息流程

內部通過PullMessageService循環的從PullRequest對應MessageQueue中主動拉取數據。

2.1. DefaultMQPushConsumerImpl.pullMessage(PullRequest)

 該方法用於完成從MessageQueue拉取消息的過程,主要過程如下:

RocketMQ Consumer接收消息流程

1.判斷該MessageQueue對應的PullRequest是否已經標記為drop,如果是則直接返回

2.進行一系列的檢查,如果檢查不通過,則等待一定時間後再放回PullMessageService的待處理隊列中,主要是通過PullMessageService中的ScheduledExecutorService來做到延遲執行,涉及的情況包括:

2.1. 如果客戶端未準備就緒(DefaultMQPushCOnsumerImpl執行start後status為RUNNING),則延遲PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION(3000)後再放回PullMessage的隊列中

2.2.如果是暫停狀態,則延遲PULL_TIME_DELAY_MILLS_WHEN_SUSPEND(1000)後再放回PullMessageService的等待隊列中

2.3.如果緩存的消息數大於配置的拉取線程數閾值(默認1000),則等待PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL(50)後再返回等待隊列中處理

2.4.如果緩存的消息大小大於配置的拉取大小閾值(默認100M),則等待PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL(50)後再返回等待隊列中處理

2.5.緩存的數據offset相差的偏移量超過設定值(默認2000),則等待PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL(50)後再返回等待隊列中處理

2.6.如果沒有訂閱MessageQueue對應的topic,則等待PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION(3000)後再返回隊列中處理

3.包裝PullCallback對象,並調用PullAPIWrapper發起異步請求拉取消息

上面通過PullAPIWrapper收到結果後會將結果包裝為PullResult對象並回調PullCallback。PullCallback和PullResult的定義如下:

<code>public interface PullCallback {
void onSuccess(final PullResult pullResult);

void onException(final Throwable e);
}/<code>


<code>public class PullResult {
private final PullStatus pullStatus;//請求狀態

private final long nextBeginOffset;//Broker返回的下一次開始消費的offset
private final long minOffset;
private final long maxOffset;
private List<messageext> msgFoundList;//消息列表,一次請求返回一批消息
}/<messageext>/<code>

下面為pullMessage方法處理異步返回結果的流程:

RocketMQ Consumer接收消息流程

1.如果請求失敗,則等待PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION(3000)後再放回PullMessageService的待處理隊列中;處理成功則進入2.

2.調用PullAPIWrapper對結果進行預處理

3.根據請求狀態進行處理

3.1.有新消息(FOUND)

3.1.1.設置PullRequest下次開始消費的起始位置為PullResult的nextBeginOffset

3.1.2.如果結果列表為空則不延遲,立馬放到PullMessageService的待處理隊列中,否則進入3

3.1.3.將PullResult中的結果List 放入ProcessQueue的緩存中,並通知ConsumeMessageService處理

3.1.4.將該PullRequest放回待處理隊列中等待再次處理,如果有設置拉取的間隔時間,則等待該時間後再翻到隊列中等待處理,否則直接放到隊列中等待處理

3.2.沒有新消息(NO_NEW_MSG)

3.2.1.設置PullRequest下次開始消費的起始位置為PullResult的nextBeginOffset

3.2.2.如果緩存的待消費消息數為0,則更新offset存儲

3.2.3.將PullRequest立馬放到PullMessageService的待處理隊列中

3.3.沒有匹配的消息(NO_MATCHED_MSG)

3.3.1.設置PullRequest下次開始消費的起始位置為PullResult的nextBeginOffset

3.3.2.如果緩存的待消費消息數為0,則更新offset存儲

3.3.3.將PullRequest立馬放到PullMessageService的待處理隊列中

3.4.不合法的偏移量(OFFSET_ILLEGAL)

3.4.1.設置PullRequest下次開始消費的起始位置為PullResult的nextBeginOffset

3.4.2.標記該PullRequset為drop

3.4.3.10s後再更新並持久化消費offset;再通知Rebalance移除該MessageQueue

 下面先介紹下ProcessQueue,這裡只標識幾個相關的屬性:

<code>public class ProcessQueue {
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
//緩存的待消費消息,按照消息的起始offset排序
private final TreeMap*消息的起始offset*/Long, MessageExt> msgTreeMap = new TreeMap<long>();
//緩存的待消費消息數量
private final AtomicLong msgCount = new AtomicLong();
//緩存的待消費消息大小
private final AtomicLong msgSize = new AtomicLong();
private final Lock lockConsume = new ReentrantLock();
/**
* A subset of msgTreeMap, will only be used when orderly consume
*/
private final TreeMap<long> consumingMsgOrderlyTreeMap = new TreeMap<long>();
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
private volatile long queueOffsetMax = 0L;
private volatile boolean dropped = false;
//最近執行pull的時間
private volatile long lastPullTimestamp = System.currentTimeMillis();
//最近被客戶端消費的時間
private volatile long lastConsumeTimestamp = System.currentTimeMillis();
private volatile boolean locked = false;
private volatile long lastLockTimestamp = System.currentTimeMillis();
//當前是否在消費,用於順序消費模式,對並行消費無效
private volatile boolean consuming = false;
private volatile long msgAccCnt = 0;
}/<long>/<long>/<long>/<code>

ProcessQueue展示了MessageQueue的消費情況。上面提到,發起pull請求後如果有數據,會先放到ProcessQueue的緩存中,即msgTreeMap屬性,因而緩存的消息會按照消息的起始offset被排序存儲。通過ProcessQueue可以查看MessageQueue當前的處理情況,ProcessQueue還用於輔助實現順序消費。

2.2 ConsumeMessageService

 異步返回的消息內容將交給ConsumeMessageService處理,ConsumeMessageService是個接口,方法定義如下:

<code>public interface ConsumeMessageService {
void start();

void shutdown();

void updateCorePoolSize(int corePoolSize);

void incCorePoolSize();

void decCorePoolSize();

int getCorePoolSize();

ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);

void submitConsumeRequest(
final List<messageext> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume);
}/<messageext>/<code>

通過定義可見,要求實現類提供異步處理的功能。內部提供的實現類有:

RocketMQ Consumer接收消息流程

ConsumeMessageConcurrentlyService:並行消費;ConsumeMessageOrderlyService:順序消費,這裡重點看ConsumeMessageConcurrentlyService。異步請求後會將拉取的新消息列表交給submitConsumeRequest方法處理,如下:

RocketMQ Consumer接收消息流程

該方法會將傳入的消息列表分裝為一個ConsumeRequest,並提到到線程池中等待處理。如果傳入的消息列表長度超過設定值(默認為1),則會分多個批處理。

 在介紹消費具體過程之前先回顧客戶端啟動流程的Demo,接收消息的寫法如下:

<code>public class Consumer {

public static void main (String[] args) throws InterruptedException, MQClientException {

// 實例化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("GroupTest");

// 設置NameServer的地址
consumer.setNamesrvAddr ("localhost:9876");

// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
consumer.subscribe ("TopicTest", "*");
// 註冊回調實現類來處理從broker拉取回來的消息
consumer.registerMessageListener (new MessageListenerConcurrently () {
@Override
public ConsumeConcurrentlyStatus consumeMessage (List<messageext> msgs, ConsumeConcurrentlyContext context) {
System.out.printf ("%s Receive New Messages: %s %n", Thread.currentThread ().getName (), msgs);
// 標記該消息已經被成功消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費者實例
consumer.start ();
System.out.printf ("Consumer Started.%n");
}
}/<messageext>/<code>

其中註冊了一個MessageListenerConcurrently,該類將用於用戶端處理消息。

 回過來看ConsumeRequest,該類實現了Runnable接口,會在run方法完成主要的處理工作,主要動作為:

  1. 調用DefaultMQPushConsumerImpl.executeHookBefore執行前置hook動作
  2. 調用MessageListenerConcurrently.consumeMessage通知用戶端處理消息,即上面demo內容,會返回處理結果ConsumeConcurrentlyStatus
  3. 調用DefaultMQPushConsumerImpl.executeHookAfter執行後置hook動作
  4. ConsumeMessageConcurrentlyService.processConsumeResult根據ConsumeConcurrentlyStatus執行收尾動作

2.2.1. MessageListenerConcurrently.consumeMessage

 用戶真正接收消息並執行處理動作的地方,需要返回ConsumeConcurrentlyStatus告知框架處理結果。這裡在方法裡最好不要做耗時長的任務,快速處理後返回給框架結果,避免消息堆積在線程池中。可以將消息內容複製一遍後再放到線程池中進行分發處理。

2.2.2. ConsumeMessageConcurrentlyService.processConsumeResult

 該方法主要在用戶消費完數據後進行收尾動作,過程如下:

RocketMQ Consumer接收消息流程

ConsumerRequest在run方法的開始處,實例化了一個ConsumeConcurrentlyContext對象,用於後續傳遞內容,該定義為:

<code>public class ConsumeConcurrentlyContext {
private final MessageQueue messageQueue;
//重試的延遲級別,-1:不重試;0:由broker控制;>0由客戶端控制
private int delayLevelWhenNextConsume = 0;
//消息列表最後一個正常消費的消息索引號
private int ackIndex = Integer.MAX_VALUE;
}/<code>

其中ackIndex表示最後一個正常消費的消息索引號(0從開始,0~ackIndex為正常消費),該位置後的消息表示沒法正常消費。該值由用戶端控制,可以通過ackIndex來控制需要重發的消息。

 ackIndex默認值為Integer.MAX_VALUE,如果為該值則認為所有消息正常消費,不存在錯誤。上面流程中統計成功和失敗也是根據ackIndex來判斷的,對於ackIndex後的消息,如果是集群消費模式,則會先嚐試發送回broker,由broker控制重試時機;如果重試失敗,會收集這些失敗的消息,延遲5秒後再調用一次ConsumeMessageService.submitConsumeRequest讓用戶端再次處理。最後會將處理成功的消息從ProcessQueue中移除,更新緩存,然後將q消費的偏移量記錄下來,等待後臺線程同步到broker或者本地。

 綜合上面的介紹,Push模式下的處理流程大致如下:

RocketMQ Consumer接收消息流程

Push模式通過PullMessageService循環從監聽的MessageQueue中以Pull模式拉取消息,並分發給用戶註冊的MesageListenerConsurrently對象處理,完了之後會自動處理消息的重試,offset更新等動作,從而模擬消息從Broker端主動推動過來。

2. Pull模式

 同Push模式一樣,Pull模式的觸發也是通過Rebalance,如下:

RocketMQ Consumer接收消息流程

同開頭提及的一樣,會回調DefaultMQPullConsumerImpl的MessageQueueListener有Queue發生改變。

 系統提供了MQPullConsumerScheduleService,可以定時以Pull模式拉取消息,並將結果通知MessageQueueListener,內部的實現為:

<code>class MessageQueueListenerImpl implements MessageQueueListener {
@Override
public void messageQueueChanged(String topic, Set<messagequeue> mqAll, Set<messagequeue> mqDivided) {//mqAll該topic下的所有q,mqDivided該實例分配到的q
MessageModel messageModel =
MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel();
switch (messageModel) {
case BROADCASTING:
MQPullConsumerScheduleService.this.putTask(topic, mqAll);//通知該topic下的監聽器,最新的所有q
break;
case CLUSTERING:
MQPullConsumerScheduleService.this.putTask(topic, mqDivided);//通知該topic下的監聽器,該實例分配的q
break;
default:
break;
}
}
}/<messagequeue>/<messagequeue>/<code>

putTask會將分配到的新的MessageQueue包裝為一個PullTaskImpl,PullTaskImpl實現了Runnable,會在後臺一直執行;而將不屬於自己處理的MessageQueue對應的PullTaskImpl停掉。PullTaskImpl會查找該MessageQueue所監聽topic對應的處理類PullTaskCallback,調用doPullTask,將具體動作讓用戶處理。

 MQPullConsumerScheduleService的例子為:

<code>public class PullScheduleService {

public static void main(String[] args) throws MQClientException {
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");

scheduleService.setMessageModel(MessageModel.CLUSTERING);

scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {//註冊topic的監聽器

@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
try {

long offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0)
offset = 0;

PullResult pullResult = consumer.pull(mq, "*", offset, 32);
System.out.printf("%s%n", offset + "\\t" + mq + "\\t" + pullResult);
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
break;
default:
break;
}
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());//上報消費的offset,消費完後要主動上報

context.setPullNextDelayTimeMillis(100);//設置下一次觸發間隔
} catch (Exception e) {
e.printStackTrace();
}
}
});

scheduleService.start();
}
}/<code>

 也可以自己手動執行pull,如下面的例子:

<code>public class PullConsumer {
private static final Map<messagequeue> OFFSE_TABLE = new HashMap<messagequeue>();

public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");

consumer.start();


Set<messagequeue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

consumer.shutdown();
}

private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;

return 0;
}

private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}

}/<messagequeue>/<messagequeue>/<messagequeue>/<code>

 相較於Push模式,Pull模式則需要用戶自己控制消息的重試,offset更新等動作。下面附上該部分當時源碼閱讀過程做的筆記簡圖:

RocketMQ Consumer接收消息流程


分享到:


相關文章: