從moquette源碼看IOT接入協議MQTT的實現

背景

閱讀優秀的代碼是一種享受,將優秀的代碼用自己的世界觀優秀地描述出來就十分痛苦了是要死一億個腦細胞的。

這篇源碼閱讀筆記早在一年前就有了當時只是簡單的記錄一下自己的總結,最近將她重新整理一下希望能幫助有需要的人。

隨著移動互聯網快速進入後半場,越來越多的企業將注意力轉移到物聯網。比如共享單車和小米的智能家居產品等都是典型的物聯網應用。

企業相信藉助於大數據和AI技術可以獲得很多額外的價值產生新的商業模式。海量數據需要通過接入服務才能流向後端產生後續價值,在接入服務中MQTT已成為物聯網中非明確的標準協議國內外雲廠均有其broker實現。

  • 百度雲
  • 阿里雲
  • 騰訊雲
  • AWS

特性

MQTT協議是為大量計算能力有限,且工作在低帶寬、不可靠的網絡的遠程傳感器和控制設備通訊而設計的協議,它具有以下主要的幾項特性:

  1. 使用發佈/訂閱消息模式,提供一對多的消息發佈,解除應用程序耦合
  2. 對負載內容屏蔽的消息傳輸
  3. 使用 TCP/IP 提供網絡連接
  4. 有三種消息發佈服務質量
  • “至多一次”,消息發佈完全依賴底層 TCP/IP 網絡。會發生消息丟失或重複。這一級別可用於如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久後還會有第二次發送。
  • “至少一次”,確保消息到達,但消息重複可能會發生。
  • “只有一次”,確保消息到達一次。這一級別可用於如下情況,在計費系統中,消息重複或丟失會導致不正確的結果。
  1. 小型傳輸,開銷很小(固定長度的頭部是2字節),協議交換最小化,以降低網絡流量
  2. 使用 Last Will (遺囑)和 Testament 特性通知有關各方客戶端異常中斷的機制

==下文中會對上述特性的實現方式進行講解==

術語

從moquette源碼看IOT接入協議MQTT的實現

客戶端Client

使用MQTT的程序或者設備,如環境監控傳感器、共享單車、共享充電寶等。

服務端Server

一個程序或設備,作為發送消息的客戶端和請求訂閱的客戶端之間的中介。

發佈、訂閱流程

客戶端-A 給 客戶端-B 發送消息“hello”流程如下:

  1. 客戶端-B訂閱名稱為msg的主題
  2. 客戶端-A向服務端-Server發送“hello”,並指明發送給名為msg的主題
  3. 服務端-Server向客戶端-B轉發消息“hello”

有別於HTTP協議的請求響應模式,客戶端-A與客戶端-B不發生直接連接關係,他們之間的消息傳遞通過服務端Server進行轉發。

服務端Server又稱

MQTT Broker 即訂閱和發送的中間人

基於moquette源碼的特性實現分析

在上述的客戶端-A 給 客戶端-B 發送消息“hello”流程中需要有如下動作。

  1. 客戶端-A 、客戶端-B 連接到服務端Server
  2. 客戶端-B 訂閱主題
  3. 客戶端-A 發佈消息
  4. 服務端Server 轉發消息
  5. 客戶端-B 收到消息

下面將基於連接、訂閱、發佈這幾個動作進行源碼跟蹤解讀。

連接

從moquette源碼看IOT接入協議MQTT的實現

MQTT-連接.png

基本概念:

Session:會話即客戶端(由ClientId作為標示)和服務端之間邏輯層面的通信;生命週期(存在時間):會話 >= 網絡連接。

ClientID:客戶端唯一標識,服務端用於關聯一個Session

只能包含這些 大寫字母,小寫字母 和 數字(0-9a-zA-Z),23個字符以內

如果 ClientID 在多次 TCP連接中保持一致,客戶端和服務器端會保留會話信息(Session)

同一時間內 Server 和同一個 ClientID 只能保持一個 TCP 連接,再次連接會踢掉前一個。

CleanSession:在Connect時,由客戶端設置

  • 0 開啟會話重用機制。網絡斷開重連後,恢復之前的Session信息。需要客戶端和服務器有相關Session持久化機制;
  • 1 關閉會話重用機制。每次Connect都是一個新Session,會話僅持續和網絡連接同樣長的時間。

Keep Alive:目的是保持長連接的可靠性,以及雙方對彼此是否在線的確認。

客戶端在Connect的時候設置 Keep Alive 時長。如果服務端在 1.5 * KeepAlive 時間內沒有收到客戶端的報文,它必須斷開客戶端的網絡連接

Keep Alive 的值由具體應用指定,一般是幾分鐘。允許的最大值是 18 小時 12 分 15 秒。

Will:遺囑消息(Will Message)存儲在服務端,當網絡連接關閉時,服務端必須發佈這個遺囑消息,所以被形象地稱之為遺囑,可用於通知異常斷線。

客戶端發送 DISCONNECT 關閉鏈接,遺囑失效並刪除

遺囑消息發佈的條件,包括:

服務端檢測到了一個 I/O 錯誤或者網絡故障

客戶端在保持連接(Keep Alive)的時間內未能通訊

客戶端沒有先發送 DISCONNECT 報文直接關閉了網絡連接

由於協議錯誤服務端關閉了網絡連接

相關設置項,需要在Connect時,由客戶端指定。

Will Flag :遺囑的總開關

  • 0 關閉遺囑功能,Will QoS 和 Will Retain 必須為 0
  • 1 開啟遺囑功能,需要設置 Will Retain 和 Will QoS

Will QoS: 遺囑消息 QoS可取值 0、1、2,含義與消息QoS相同

Will Retain:遺囑是否保留

  • 0 遺囑消息不保留,後面再訂閱不會收到消息
  • 1 遺囑消息保留,持久存儲

Will Topic:遺囑話題

Will Payload:遺囑消息內容

連接流程

  1. 判斷客戶端連接時發送的MQTT協議版本號,非3.1和3.1.1版本發送協議不支持響應報文並在發送完成後關閉連接
  2. 在客戶端配置了cleanSession=false 或者服務端不允許clientId不存在的情況下客戶端如果未上傳clientId發送協議不支持響應報文並在發送完成後關閉連接
  3. 判斷用戶名和密碼是否合法
  4. 初始化連接對象並將連接對象引用放入連接管理中,如果發現連接管理中存在相同客戶端ID的對象則關閉前一個連接並將新的連接對象放入連接管理中
  5. 根據客戶端上傳的心跳時間調整服務端當前連接的心跳判斷時間(keepAlive * 1.5f)
  6. 遺囑消息存儲(當連接意外斷開時向存儲的主題發佈消息)
  7. 發送連接成功響應
  8. 創建當前連接session
  9. 當cleanSession=false 發送當前session已經存儲的消息
public void processConnect(Channel channel, MqttConnectMessage msg) {
MqttConnectPayload payload = msg.payload();
String clientId = payload.clientIdentifier();
final String username = payload.userName();
LOG.debug("Processing CONNECT message. CId={}, username={}", clientId, username);
// 1. 判斷客戶端連接時發送的MQTT協議版本號,非3.1和3.1.1版本發送協議不支持響應報文並在發送完成後關閉連接
if (isNotProtocolVersion(msg, MqttVersion.MQTT_3_1) && isNotProtocolVersion(msg, MqttVersion.MQTT_3_1_1)) {

MqttConnAckMessage badProto = connAck(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);
LOG.error("MQTT protocol version is not valid. CId={}", clientId);
channel.writeAndFlush(badProto).addListener(FIRE_EXCEPTION_ON_FAILURE);
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
final boolean cleanSession = msg.variableHeader().isCleanSession();
if (clientId == null || clientId.length() == 0) {
// 2. 在客戶端配置了cleanSession=false 或者服務端不允許clientId不存在的情況下客戶端如果未上傳clientId發送協議不支持響應報文並在發送完成後關閉連接
if (!cleanSession || !this.allowZeroByteClientId) {
MqttConnAckMessage badId = connAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
channel.writeAndFlush(badId).addListener(FIRE_EXCEPTION_ON_FAILURE);
channel.close().addListener(CLOSE_ON_FAILURE);
LOG.error("MQTT client ID cannot be empty. Username={}", username);
return;
}
// Generating client id.
clientId = UUID.randomUUID().toString().replace("-", "");
LOG.info("Client has connected with server generated id={}, username={}", clientId, username);
}
// 3. 判斷用戶名和密碼是否合法
if (!login(channel, msg, clientId)) {
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
//4.初始化連接對象並將連接對象引用放入連接管理中,如果發現連接管理中存在相同客戶端ID的對象則關閉前一個連接並將新的連接 對象放入連接管理中
ConnectionDescriptor descriptor = new ConnectionDescriptor(clientId, channel, cleanSession);
final ConnectionDescriptor existing = this.connectionDescriptors.addConnection(descriptor);
if (existing != null) {
LOG.info("Client ID is being used in an existing connection, force to be closed. CId={}", clientId);
existing.abort();
//return;
this.connectionDescriptors.removeConnection(existing);
this.connectionDescriptors.addConnection(descriptor);
}
if (!descriptor.assignState(DISCONNECTED, INIT_SESSION)) {
channel.close().addListener(CLOSE_ON_FAILURE);
return;

}
LOG.debug("Initializing client session {}", clientId);
ClientSession existingSession = this.sessionsRepository.sessionForClient(clientId);
boolean isSessionAlreadyStored = existingSession != null;
final boolean msgCleanSessionFlag = msg.variableHeader().isCleanSession();
if (isSessionAlreadyStored && msgCleanSessionFlag) {
for (Subscription existingSub : existingSession.getSubscriptions()) {
this.subscriptions.removeSubscription(existingSub.getTopicFilter(), clientId);
}
}
// 5. 根據客戶端上傳的心跳時間調整服務端當前連接的心跳判斷時間(keepAlive * 1.5f)
initializeKeepAliveTimeout(channel, msg, clientId);
final ClientSession clientSession = this.sessionsRepository.createOrLoadClientSession(clientId, cleanSession);
// 6. 遺囑消息存儲(當連接意外斷開時向存儲的主題發佈消息)
clientSession.storeWillMessage(msg, clientId);
int flushIntervalMs = 500/* (keepAlive * 1000) / 2 */;
setupAutoFlusher(channel, flushIntervalMs);
if (!cleanSession && reauthorizeSubscriptionsOnConnect) {
reauthorizeOnExistingSubscriptions(clientId, username);
}
if (!descriptor.assignState(INIT_SESSION, SENDACK)) {
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
LOG.debug("Sending CONNACK. CId={}", clientId);
MqttConnAckMessage okResp = createConnectAck(msg, clientId);
final String connectClientId = clientId;
// 7. 發送連接成功響應
descriptor.writeAndFlush(okResp, new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
LOG.debug("CONNACK has been sent. CId={}", connectClientId);
if (!descriptor.assignState(SENDACK, MESSAGES_REPUBLISHED)) {
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
m_interceptor.notifyClientConnected(msg);
if (!msg.variableHeader().isCleanSession()) {
// force the republish of stored QoS1 and QoS2
internalRepublisher.publishStored(clientSession);
}
if (!descriptor.assignState(MESSAGES_REPUBLISHED, ESTABLISHED)) {
channel.close().addListener(CLOSE_ON_FAILURE);

}
LOG.info("Connected client with login ", connectClientId, username);
} else {
future.channel().pipeline().fireExceptionCaught(future.cause());
}
}
});
}
private boolean isNotProtocolVersion(MqttConnectMessage msg, MqttVersion version) {
return msg.variableHeader().version() != version.protocolLevel();
}

訂閱

從moquette源碼看IOT接入協議MQTT的實現

MQTT-訂閱.png

基本概念

訂閱流程

  1. 訂閱的主題校驗(權限、主題path合法性)
  2. 在當前session中存儲訂閱的主題
  3. 採用全局tree結構存儲訂閱信息(主題和訂閱者信息),用於消息轉發時根據主題查找到對應的訂閱者(tree結構和查找算法下一章節中介紹
  4. 發送訂閱回應
  5. 掃描持久化的消息匹配到當前訂閱主題的立即向此連接發送消息
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = messageId(msg);
LOG.debug("Processing SUBSCRIBE message. CId={}, messageId={}", clientID, messageID);
RunningSubscription executionKey = new RunningSubscription(clientID, messageID);
SubscriptionState currentStatus = subscriptionInCourse.putIfAbsent(executionKey, SubscriptionState.VERIFIED);
if (currentStatus != null) {
LOG.warn("Client sent another SUBSCRIBE message while this one was being processed CId={}, messageId={}",
clientID, messageID);
return;
}
String username = NettyUtils.userName(channel);
// 1、訂閱的主題校驗(權限、主題path合法性)
List ackTopics = doVerify(clientID, username, msg);
MqttSubAckMessage ackMessage = doAckMessageFromValidateFilters(ackTopics, messageID);

if (!this.subscriptionInCourse.replace(executionKey, SubscriptionState.VERIFIED, SubscriptionState.STORED)) {
LOG.warn("Client sent another SUBSCRIBE message while the topic filters were being verified CId={}, " +
"messageId={}", clientID, messageID);
return;
}
LOG.debug("Creating and storing subscriptions CId={}, messageId={}, topics={}", clientID, messageID, ackTopics);
// 2、在當前session中存儲訂閱的主題
List newSubscriptions = doStoreSubscription(ackTopics, clientID);
// save session, persist subscriptions from session
// 3、採用全局tree結構存儲訂閱信息(主題和訂閱者信息),用於消息轉發時根據主題查找到對應的訂閱者
for (Subscription subscription : newSubscriptions) {
subscriptions.add(subscription);
}
LOG.debug("Sending SUBACK response CId={}, messageId={}", clientID, messageID);
// 4、發送訂閱回應
channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
// fire the persisted messages in session
// 5、掃描持久化的消息匹配到當前訂閱主題的立即向此連接發送消息
for (Subscription subscription : newSubscriptions) {
publishRetainedMessagesInSession(subscription, username);
}
boolean success = this.subscriptionInCourse.remove(executionKey, SubscriptionState.STORED);
if (!success) {
LOG.warn("Unable to perform the final subscription state update CId={}, messageId={}", clientID, messageID);
} else {
LOG.info("Client subscribed to topics", clientID);
}
}

發佈

基本概念

Packet Identifier:報文標識存在報文的可變報頭部分,非零兩個字節整數 (0-65535]。

一個流程中重複:這些報文包含PacketID,而且在一次通信流程內保持一致:PUBLISH(QoS>0時)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCIBE、UNSUBACK 。

新的不重複:客戶端每次發送一個新的這些類型的報文時都必須分配一個當前 未使用的PacketID

當客戶端處理完這個報文對應的確認後,這個報文標識符就釋放可重用。

獨立維護:客戶端和服務端彼此獨立地分配報文標識符。因此,客戶端服務端組合使用相同的報文標識符可以實現併發的消息交換。客戶端和服務端產生的Packet Identifier一致不算異常。

Payload: 有效載荷即消息體最大允許 256MB。

Publish 的 Payload 允許為空,在很多場合下代表將持久消息(或者遺囑消息)清空。採用UTF-8編碼。

Retain:持久消息(粘性消息)

RETAIN 標記:每個Publish消息都需要指定的標記

  • 0 服務端不能存儲這個消息,也不能移除或替換任何 現存的保留消息
  • 1 服務端必須存儲這個應用消息和它的QoS等級,以便它可以被分發給未來的訂閱者

每個Topic只會保留最多一個 Retain 持久消息

客戶端訂閱帶有持久消息的Topic,會立即受到這條消息。

服務器可以選擇丟棄持久消息,比如內存或者存儲吃緊的時候。

如果客戶端想要刪除某個Topic 上面的持久消息,可以向這個Topic發送一個Payload為空的持久消息

遺囑消息(Will)的Retain持久機制同理。

QoS :服務等級(消息可靠性)

發佈流程

public void processPublish(Channel channel, MqttPublishMessage msg) {
final MqttQoS qos = msg.fixedHeader().qosLevel();
final String clientId = NettyUtils.clientID(channel);

LOG.info("Processing PUBLISH message. CId={}, topic={}, messageId={}, qos={}", clientId,
msg.variableHeader().topicName(), msg.variableHeader().messageId(), qos);
switch (qos) {
case AT_MOST_ONCE:
this.qos0PublishHandler.receivedPublishQos0(channel, msg);
break;
case AT_LEAST_ONCE:
this.qos1PublishHandler.receivedPublishQos1(channel, msg);
break;
case EXACTLY_ONCE:
this.qos2PublishHandler.receivedPublishQos2(channel, msg);
break;
default:
LOG.error("Unknown QoS-Type:{}", qos);
break;
}
}

從上述代碼的switch語句中可以看出會根據消息的Qos級別分別進行處理

QoS0 最多一次

從moquette源碼看IOT接入協議MQTT的實現

  1. 權限判斷
  2. 向所有該主題的訂閱者發佈消息
  3. QoS == 0 && retain => clean old retained
void receivedPublishQos0(Channel channel, MqttPublishMessage msg) {
// verify if topic can be write
final Topic topic = new Topic(msg.variableHeader().topicName());
String clientID = NettyUtils.clientID(channel);
String username = NettyUtils.userName(channel);
// 1. 權限判斷
if (!m_authorizator.canWrite(topic, username, clientID)) {
LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
return;
}
// route message to subscribers
IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
toStoreMsg.setClientID(clientID);
// 2. 向所有該主題的訂閱者發佈消息
this.publisher.publish2Subscribers(toStoreMsg, topic);
if (msg.fixedHeader().isRetain()) {
// 3. QoS == 0 && retain => clean old retained
m_messagesStore.cleanRetained(topic);
}
m_interceptor.notifyTopicPublished(msg, clientID, username);
}

QoS1 至少一次

從moquette源碼看IOT接入協議MQTT的實現

1.發送消息PUBLISH

  1. 權限判斷
  2. 向所有該主題的訂閱者發佈消息(每個session中存儲即將要發送的消息)
  3. 發送Ack回應
  4. retain = true => 存儲消息
void receivedPublishQos1(Channel channel, MqttPublishMessage msg) {
// verify if topic can be write
final Topic topic = new Topic(msg.variableHeader().topicName());
topic.getTokens();
if (!topic.isValid()) {
LOG.warn("Invalid topic format, force close the connection");
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
String clientID = NettyUtils.clientID(channel);
String username = NettyUtils.userName(channel);
// 1. 權限判斷
if (!m_authorizator.canWrite(topic, username, clientID)) {
LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
return;
}
final int messageID = msg.variableHeader().messageId();
// route message to subscribers
IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
toStoreMsg.setClientID(clientID);
// 2. 向所有該主題的訂閱者發佈消息(每個session中存儲即將要發送的消息)
this.publisher.publish2Subscribers(toStoreMsg, topic, messageID);
// 3. 發送Ack回應
sendPubAck(clientID, messageID);
// 4. retain = true => 存儲消息
if (msg.fixedHeader().isRetain()) {

if (!msg.payload().isReadable()) {
m_messagesStore.cleanRetained(topic);
} else {
// before wasn't stored
m_messagesStore.storeRetained(topic, toStoreMsg);
}
}
m_interceptor.notifyTopicPublished(msg, clientID, username);
}

2.1發送消息回應PUBACK

服務端Server接收到PUBACK消息後將執行:

  1. 刪除存儲在session中的消息
public void processPubAck(Channel channel, MqttPubAckMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = msg.variableHeader().messageId();
String username = NettyUtils.userName(channel);
LOG.trace("retrieving inflight for messageID ", messageID);
ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
StoredMessage inflightMsg = targetSession.inFlightAcknowledged(messageID);
String topic = inflightMsg.getTopic();
InterceptAcknowledgedMessage wrapped = new InterceptAcknowledgedMessage(inflightMsg, topic, username,
messageID);
m_interceptor.notifyMessageAcknowledged(wrapped);
}

QoS2 有且僅有一次

從moquette源碼看IOT接入協議MQTT的實現

1.發送消息PUBLISH

  1. 權限判斷
  2. 存儲消息
  3. 發送Rec回應
void receivedPublishQos2(Channel channel, MqttPublishMessage msg) {
final Topic topic = new Topic(msg.variableHeader().topicName());
// check if the topic can be wrote
String clientID = NettyUtils.clientID(channel);
String username = NettyUtils.userName(channel);
// 1. 權限判斷
if (!m_authorizator.canWrite(topic, username, clientID)) {
LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
return;
}
final int messageID = msg.variableHeader().messageId();
// 2. 存儲消息
IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
toStoreMsg.setClientID(clientID);
LOG.info("Sending publish message to subscribers CId={}, topic={}, messageId={}", clientID, topic, messageID);
if (LOG.isTraceEnabled()) {
LOG.trace("payload={}, subs Tree={}", payload2Str(toStoreMsg.getPayload()), subscriptions.dumpTree());
}
this.sessionsRepository.sessionForClient(clientID).markAsInboundInflight(messageID, toStoreMsg);
// 3. 發送Rec回應
sendPubRec(clientID, messageID);
// Next the client will send us a pub rel
// NB publish to subscribers for QoS 2 happen upon PUBREL from publisher
// if (msg.fixedHeader().isRetain()) {
// if (msg.payload().readableBytes() == 0) {
// m_messagesStore.cleanRetained(topic);
// } else {
// m_messagesStore.storeRetained(topic, toStoreMsg);
// }
// }
//TODO this should happen on PUB_REL, else we notify false positive
m_interceptor.notifyTopicPublished(msg, clientID, username);
}

2.發送消息Rel

  1. 刪除消息
  2. 轉發消息
  3. 發送Comp 回應給客戶端-A
void processPubRel(Channel channel, MqttMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = messageId(msg);
LOG.info("Processing PUBREL message. CId={}, messageId={}", clientID, messageID);
ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
// 1. 刪除消息
IMessagesStore.StoredMessage evt = targetSession.inboundInflight(messageID);
if (evt == null) {
LOG.warn("Can't find inbound inflight message for CId={}, messageId={}", clientID, messageID);
throw new IllegalArgumentException("Can't find inbound inflight message");
}
final Topic topic = new Topic(evt.getTopic());
// 2. 轉發消息
this.publisher.publish2Subscribers(evt, topic, messageID);
if (evt.isRetained()) {
if (evt.getPayload().readableBytes() == 0) {
m_messagesStore.cleanRetained(topic);
} else {
m_messagesStore.storeRetained(topic, evt);
}
}
//TODO here we should notify to the listeners
//m_interceptor.notifyTopicPublished(msg, clientID, username);
// 3.發送Comp 回應
sendPubComp(clientID, messageID);
}

3.發送消息回應Rec

  1. 刪除消息
  2. 存儲消息(分別存儲在secondPhaseStore和outboundInflightMap)
  3. 發送PUBREL
public void processPubRec(Channel channel, MqttMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = messageId(msg);
LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID);
ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
// remove from the inflight and move to the QoS2 second phase queue
// 1. 刪除消息
StoredMessage ackedMsg = targetSession.inFlightAcknowledged(messageID);
// 2. 存儲消息(分別存儲在secondPhaseStore和outboundInflightMap)
targetSession.moveInFlightToSecondPhaseAckWaiting(messageID, ackedMsg);
// once received a PUBREC reply with a PUBREL(messageID)
LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID);
// 3. 發送PUBREL
MqttFixedHeader pubRelHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false, 0);
MqttMessage pubRelMessage = new MqttMessage(pubRelHeader, from(messageID));
channel.writeAndFlush(pubRelMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
}

3.4發送消息回應Comp

  1. 刪除消息
public void processPubComp(Channel channel, MqttMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = messageId(msg);
LOG.debug("Processing PUBCOMP message. CId={}, messageId={}", clientID, messageID);
// once received the PUBCOMP then remove the message from the temp memory
ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
// 1. 刪除消息
StoredMessage inflightMsg = targetSession.completeReleasedPublish(messageID);
String username = NettyUtils.userName(channel);
String topic = inflightMsg.getTopic();
final InterceptAcknowledgedMessage interceptAckMsg = new InterceptAcknowledgedMessage(inflightMsg, topic,
username, messageID);
m_interceptor.notifyMessageAcknowledged(interceptAckMsg);

}

Topic & Subcribe

基本概念

Topic 話題 和 TopicFilter 話題過濾器

Pub-Sub消息模型的核心機制

UTF-8 編碼字符串,不能超過 65535 字節。層級數量沒有限制

不能包含任何的下文中提到的特殊符號(/、+、#),必須至少包含一個字符

區分大小寫,可以包含空格,不能包含空字符 (Unicode U+0000)

在收部或尾部增加 斜槓 “/”,會產生不同的Topic和TopicFilter。舉例:

  • “/A” 和 “A” 是不同的
  • “A” 和 “A/” 是不同的

只包含斜槓 “/” 的 Topic 或 TopicFilter 是合法的

TopicFilter中的特殊符號

層級分隔符 /

用於分割主題的每個層級,為主題名提供一個分層結構

主題層級分隔符可以出現在 Topic 或 TopicFilter 的任何位置

特例:相鄰的主題層次分隔符表示一個零長度的主題層級

單層通配符 +

只能用於單個主題層級匹配的通配符。例如,“a/b/+” 匹配 “a/b/c1” 和 “a/b/c2” ,但是不匹配 “a/b/c/d”

可以匹配 任意層級,包括第一個和最後一個層級。

例如,“+” 是有效的,“sport/+/player1” 也是有效的。

可以在多個層級中使用它,也可以和多層通配符一起使用。

例如,“+/tennis/#” 是有效的。只能匹配本級不能匹配上級。

例如,“sport/+” 不匹配 “sport” 但是卻匹配“sport/”,“/finance” 匹配 “+/+” 和 “/+” ,但是不匹配 “+”。

多層通配符 #

用於匹配主題中任意層級的通配符

匹配包含本身的層級和子層級。

例如 “a/b/c/#" 可以匹配 “a/b/c”、“a/b/c/d” 和 “a/b/c/d/e”

必須是最後的結尾。

例如 “sport/tennis/#/ranking”是無效的

“#”是有效的,會收到所有的應用消息。 (服務器端應將此類 TopicFilter禁掉 )

以$開頭的,服務器保留

服務端不能將 $ 字符開頭的 Topic 匹配通配符 (#或+) 開頭的 TopicFilter

服務端應該阻止客戶端使用這種 Topic 與其它客戶端交換消息。

服務端實現可以將 $ 開頭的主題名用作其他目的。

SYS/ 被廣泛用作包含服務器特定信息或控制接口的主題的前綴 客戶端不特意訂閱開頭的 Topic,就不會收到對應的消息

  • 訂閱 “#” 的客戶端不會收到任何發佈到以 “$” 開頭主題的消息
  • 訂閱 “+/A/B” 的客戶端不會收到任何發佈到 “$SYS/A/B” 的消息
  • 訂閱 “SYS/#” 的客戶端會收到發佈到以 “SYS/” 開頭主題的消息
  • 訂閱 “SYS/A/+” 的客戶端會收到發佈到 “SYS/A/B” 主題的消息

如果客戶端想同時接受以 “SYS/” 開頭主題的消息和不以 開頭主題的消息,它需要同時 訂閱 “#” 和 “$SYS/#”

存儲結構

  • a/b/c
  • a/a
  • a/haha
  • msg

這4個主題會存儲成如下結構:

  1. children 指向下層節點
  2. subscriptions 存儲當前主題所有的訂閱者
從moquette源碼看IOT接入協議MQTT的實現

查找算法

訂閱

@Override
public void add(Subscription newSubscription) {
Action res;
do {
res = insert(newSubscription.clientId, newSubscription.topicFilter, this.root, newSubscription.topicFilter);
} while (res == Action.REPEAT);
}
private Action insert(String clientId, Topic topic, final INode inode, Topic fullpath) {
Token token = topic.headToken();
if (!topic.isEmpty() && inode.mainNode().anyChildrenMatch(token)) {
Topic remainingTopic = topic.exceptHeadToken();
INode nextInode = inode.mainNode().childOf(token);
return insert(clientId, remainingTopic, nextInode, fullpath);
} else {
if (topic.isEmpty()) {
return insertSubscription(clientId, fullpath, inode);
} else {
return createNodeAndInsertSubscription(clientId, topic, inode, fullpath);
}
}
}

刪除訂閱

/**
* Removes subscription from CTrie, adds TNode when the last client unsubscribes, then calls for cleanTomb in a
* separate atomic CAS operation.
*
* @param topic
* @param clientID
*/
@Override
public void removeSubscription(Topic topic, String clientID) {
Action res;
do {
res = remove(clientID, topic, this.root, NO_PARENT);

} while (res == Action.REPEAT);
}
private Action remove(String clientId, Topic topic, INode inode, INode iParent) {
Token token = topic.headToken();
if (!topic.isEmpty() && (inode.mainNode().anyChildrenMatch(token))) {
Topic remainingTopic = topic.exceptHeadToken();
INode nextInode = inode.mainNode().childOf(token);
return remove(clientId, remainingTopic, nextInode, inode);
} else {
final CNode cnode = inode.mainNode();
if (cnode instanceof TNode) {
// this inode is a tomb, has no clients and should be cleaned up
// Because we implemented cleanTomb below, this should be rare, but possible
// Consider calling cleanTomb here too
return Action.OK;
}
if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
// last client to leave this node, AND there are no downstream children, remove via TNode tomb
if (inode == this.root) {
return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT;
}
TNode tnode = new TNode();
return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT;
} else if (cnode.contains(clientId) && topic.isEmpty()) {
CNode updatedCnode = cnode.copy();
updatedCnode.removeSubscriptionsFor(clientId);
return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
} else {
//someone else already removed
return Action.OK;
}
}
}

查找

Set recursiveMatch(Topic topic, INode inode) {
CNode cnode = inode.mainNode();
if (Token.MULTI.equals(cnode.token)) {
return cnode.subscriptions;
}
if (topic.isEmpty()) {
return Collections.emptySet();
}
if (cnode instanceof TNode) {
return Collections.emptySet();

}
final Token token = topic.headToken();
if (!(Token.SINGLE.equals(cnode.token) || cnode.token.equals(token) || ROOT.equals(cnode.token))) {
return Collections.emptySet();
}
Topic remainingTopic = (ROOT.equals(cnode.token)) ? topic : topic.exceptHeadToken();
Set subscriptions = new HashSet<>();
if (remainingTopic.isEmpty()) {
subscriptions.addAll(cnode.subscriptions);
}
for (INode subInode : cnode.allChildren()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode));
}
return subscriptions;
}


分享到:


相關文章: